m 



MIT/LCS/TR-207 



ROBUST CONCURRENCY CONTROL FOR A DISTRIBUTED 
INFORMATION SYSTEM 



Warren A. Montgomery 



This research was supported by the Advanced Research 
Projects Agency of the Department of Defense and was 
monitored by the Office of Naval Research under 
Contract No. N00014-75-C-0661 



This blank page was inserted to preserve pagination. 



..^»_-^, 



ROBUST CONCURRENCY CONTROL 
FOR A DISTRIBUTED INFORMATION SYSTEM 



by 



Warren A. Montgomery 



December, 1978 



© Massachusetts Institute of Technology 1978 



T^iPP*"* w « supported by the Advanced Research Projects Agency of the Department 
of Depise and was monitored by the Office of Naval Research under Contract No. 
N<XXM£75-C-0661. 



Massachusetts Institute of Technology 
Laboratory for Computer Science 



Cambridge Massachusetts OHfT 



This empty page was substituted for a 
blank page in the original document. 



"2- 
ACKNOWLEDGEMENTS 

Many people aided greatly in the production of this thesis. A complete 
acknowledgement would no doubt run longer than the work itself. My most sincere thanks 
to those who have not been included in this brief list, but none the less made my stay at 
MIT stimulating and productive. 

I commend Professor L. Svobodova for her great endurance in reading many poorly 
written drafts of my early ideas and for her prompt response to each of my attempts to clarify 
those ideas. She was most helpful in that she could usually figure out what I meant to say, 
even when I was not sure myself, and yet still was able to read each draft as if she had not 
seen the work before. Dr. D. Clark was most helpful in providing key suggestions to clear 
up many of the vague areas, and his faith and interest in my early ideas were very much 
appreciated. 

Professors F. J. Corbato and M. Hammer, my readers, provided many helpful 
suggestions for improving the thesis at and after my thesis examination. They were able to 
point out areas for improvement that were not so apparent to one who was caught up in the 
work. 

I would like to thank Jim Cray of IBM Research, San Jose, and Dave Reed for 
several productive discussions of ideas in and related to my thesis. Such discussions were 
particularly helpful in providing early guidance on the merits of several of my ideas. Dave 
Reed must also be thanked for his help in creating many text-processing tools that were used 
in preparing the finished thesis. 

I would like to thank all of the members of the Computer Systems Research group 
for providing an environment in which ideas were freely discussed and explored. They were 
also most helpful in being my agents after I had left MJ.T. I am extremely grateful for 
Allen LuniewskiY help in relaying copies of the thesis between the printer at M.I.T., and 
myself in Illinois. My fellow workers and management at Bell Laboratories must also be 
thanked for their support in encouraging me to finish my thesis at a time during which it 
was most difficult to concentrate on the work. 

My special thanks go to my wife, Carla, who endured five and a half long years of 
my career as a graduate student. She was always most understanding, even at times when 
her own thesis work was not going well. Her efforts in improving my writing have had a 
great impact on the quality and clarity of the final copy. Most of all, I would like to thank 
her for her unending confidence in my ability to finish, and for providing a strong incentive 
for doing so. 

I would also like to thank the National Science Foundation for their support of my 
first three years at M.I.T. through the graduate fellowship program. 



This empty page was substituted for a 
blank page in the original document. 



-3- 

ROBUST CONCURRENCY CONTROL 
FOR A DISTRIBUTED INFORM ATION SYSTEM 

by 

WARREN A. MOfclTOOMHRY 

Submitted to the Department of Electrk^ Engineering ami OwpiHer Science 

on December 1, 1978 in partial futfJUmiKU of the requirements 

for the Degree of Doctor of Philosophy. 



Abstract 

This dissertation presents a collection of protocols for coordinating, transactions in a 
distributed information system. The system is modeled as a collection of processes that 
communicate only through message passing. Each process manages some portion of the data 
base, and several processes may cooperate in performing a single transaction. 

The thesis presents a model for computation in a distributed information system in 
which the sites and communication links may fail. The effects of such failures on the 
computation are described in the model. The thesis discusses implementation techniques that 
could be used to limit the effects of failures in a real system to those described in the model. 

A hierarchical protocol for coordinating transactions is presented. The accesses to be 
performed during a transaction are pre-anatyzed to select the protocols needed to coordinate 
the processes that participate in the implementation of the transaction. This analysis can be 
used to guide the organization of the data base so as to minimize the amount of locking 
required in performing frequent or important transactions. An important aspect of this 
mechanism is that it allows transactions that cannot accurately be pre-analyzed to be 
performed and correctly synchronized without severely degrading the performance of the 
system in performing more predictable transactions. 

A novel approach to the problem of making updates at several different sites 
atomically is also discussed. This approach is based on the notion of a potyvaitu, which is 
used to represent two or more possible values for a single data item. A polyvalue is created 
for an item involved in an update that has been delayed due to a failure. By assigning a 
polyvalue to such an item, that item can be made accessible to subsequent transactions, rather 
than remaining locked until the update can be completed. A polyvalue describes the possible 
values that may be correct for an item, depending on the outcome of transactions that have 
been interrupted by failures. FrequeMly, tben»« iwportwttl^ as 

the payment of money) can be determined without knowing the exact values of the items in 
the data base. A polyvalue for an item that is accessed by such a transaction may be 
sufficient to determine such effects. By using poly values, we can guarantee that a data item 
will not be made inaccessible by any failure other than a failure of the site that holds the 
item. 



A strong motivation for the deve l opm e nt of Hide protocols is the desire that the 
individual sites of a distributed information system &} moependenUy, and that a site or a 
group of sites be able to conttooe local processing operates when a failure has isolated 
them from the rest of the sites. Many of die previous coordkwtton mechanism have only 
considered the continued operation of die site that remain with the system to be important. 
Another motivating factor for the development of these, protocols is the idea that in many 
applications, the processing to be performed exhibits a high degree of locality of reference, in 
that most operations involve only a smafl number of shea. By structuring the coordination 
mechanism to rate advantage of this locality of reference; one cart have protocols that are 
simpk.emcl€nt,andTTjem«^theo«rtk»terapplk3ittoa. 



keywords: distributed data bases, synchronization, message passing systems, reliability. 



5- 



CONTBNT8 

Acknowledgements , 2 

Abstract ..., .....■' 3 

Table of Contents 5 

Table of Figures .,..;,„...l... ...8 

1. Introduction ........................ 9 

LI. Reasons For Distribution 9 

1.2. The Concurrency Control Problem in a Distributed Information System 12 

1.3. Basic Assumptions and Goals .7.....1.:........,......„.V!:/....t^.;.^.....V.. ...:....... 14 

1.4. Related Work 18 

1.5. Thesis Plan 26 

2. The Process Model of Distributed Computing 29 

2.1. The Model ...... v & 

2.2 Atomic Transactions Revisited 45 

2.3. Summary ..... 53 

S. Atomic Broadcasting ...«« 55 

3.1. Definitions ..55 

3.2. An Illustration of Atomic Broadcasting ...» 58 

3.3. A Mechanism for Atomic Broadcasting ... 59 

3.4. Other Ordering Restrictions on Broadcast Messages 65 

3.5. Implementation ,..„„..,„.. .....,., ........... 68 

3.6. Evaluation .......:..„„.,.,...., r ^,.l. ........... 80 

£■ /• dU I f II l mLiV •••■■••■■•■•■•>■■■••■■■•■■•••••>••■•••»••••••••••.••••**••••»*■ • »•• • • •• • * »• „• •••♦•••••#••*•••••*• v3 



-6- 

4. Atomk:TrnM«ctiom in fhe^racewMmlel W 

4.1. Analysis oT Transactions 87 

4.2. A Simple Approach to Transactum SyMh/onization 94 

43. Classes of Transactions C.:......V.. 99 

4.4. A Hierarchical Scheme fat Trama^ion Syncbreniatton 103 

4.5. ImphroentatiwitrfHtBmirhteal Locking 106 

4.6. A Re ject e d Afeamative SohKioR .....119 

4.7. Conclusions and Summary ..... 122 

5. Poly valves: A Meehaalsni for Performing Atomic tipoates to IHstrthated Data .... 125 

5.1. Motivation (The Trouble with Locking) 125 

5.2. The INrtyvakie fc(«cfcani«p for Avoiding Deky Due to Lockmg 432 

53. Recovery <uf PenoJag Transactions .'• 138 

5.4. Use of Potyvaloes « the t he r a t'c hta ri Locking Scheme 142 

5.5. Restricting the ffmnri of Pgftyyaloes 148 

5.6. Summary ... BO 

€. Applkatfon of the Technics to tta^ System ; lit 

6i. The Problem „ 151 

6.2. Analysis of the Transactions 155 

6.4. Summary ... .............l.V. .......171 

7. Conclusions and Areas for Further Research ITS 

7.1. Summary of Thesis Work 173 

7.2. Areas for Further Research 175 

7.3. Summary ITS 

References 179 

A. Proofs of the Protocols 182 

A.l. Formalization of Atomte Broadcasting 182 

A.2. Proof of Atomic Broadcasting 184 

A3. Corr<« RelaiSve Seouenctog of Broadcasts 186 



-7- 
B. An Analysis of the Propogation of Polyvalues 190 

B.l. A Model for the Creation and Deletion of Polyvalues 190 

B.2. Simulation of the Use of Polyvaues 195 

Biographical Note 197 



FIGURES 



Figure 2.1 The Execution History of a Process 32 

Figure 3.1 The Abbreviated Execution History of a Process 56 

Figure 3.2 Non-Atomic Broadcasting 57 

Figure 3.3 Coordinating Atomic Broadcasts with Menage Fo rw arde rs .......61 

Figure 3.4 Moving a Process 73 

Figure 3.5 A Physical Commutation Topology 81 

Figure 3.6 A Logical Topology tor the Network of Figure 35 82 

Figure 4.1 A Simple Transaction Graph 88 

Figure 4,2 An Activity Crash For an Implementation of T 91 

Figure 4.3 A Joint Activity Graph 93 

Figure 4.4 A Transaction Using Delayed Locking M4 

Figure 45 Concurrency Restrictions Due to Hierarchical Structure 121 

Figure 5.1 A Two-Phase Commit Protocol 131 

Figure 5.2 Recovery of Pending Transactions 143 

Table 6.1 Transaction* for Inventory Control 154 

Figure 6.1 Transaction Graphs for Inventory Transactions 157 

Figure 6.2 A Joint Transaction Graph of The Inventory Transactions 158 

Figure 6.3 An Activity Graph for a Simple Data Base Organisation 159 

Figure 6.4 An Activity Graph for a more efficient Organisation of the Data 161 

Figure 6.5 A More Complete Activity Graph 162 

Figure 6.6 An Activity Graph for a Redundant Data Base Organization 167 

Table B.1 Typ test Predictions of the Number of Potyvanm in a Database 194 

Table B.2 Results of the Simulation of Poh/values 196 



-9- 

Chapterl 
Introduction 



Recent developments in electronic technology have made practical the interconnection 
of a large number of computer systems to form what I will refer to as a distributed 
information system. Each of the computer system* far sites, as they are more frequently 
called) in the resulting system maUitalnt some information and tools ,jfojT: accessing that 
information. The sites that make up a distributed j^onnation system may not be under the 
control of a single administrative authority. A dl^ributed inftrmatJspn system allows any 
user of any of the individual sites controlled access to the entire body of information 
managed by the system, while it allows each of the individual conjpunn* systems to control 
the use of the tools and information that it holds. 

1.1 Reasons For Distribution 

There are several good reasons for choosing such an organization for an information 
system rather than placing all of the information in * stogkhurg*, shared computing facility. 
I will discuss some of these reasons briefly. 

1.1.1 Autonomy 

A very important reason for choosing a distributed organization for an information 
system is the autonomy of the individual sites. A recent stud^UK>Hveipa73Q has shown that 
the ability to partition the authority and responsibikty fen information management in a 
distributed system is the most important reason for many bmiwaiiw considering distributed 



-10- 
in fo i moti o n eyatems. la * dsstrdsuted system, each site has control over the information th»t 
it manage*, and oe tot to own poteta for controlling Mm availability of that Information. 
As we shall see, a uto n omy has important implications for the assumpttons that can be made 
about the cooperation of individual sites in the execution of p r o ce ssin g ope ration *, and for 
the protocols that can be used to coordinate such operations. 

1.1.2 Reliability 

A second reason for distribution U reliability. There " two way* hi which a 
distributed information system can be made more isettebifc than a central facility. One way to 
achieve greater reliability in a distributed system is to repHcate information, storing it at two 
or more of the sites in a distributed system. Replication increases the availability of 
information *n a system with unreliable sites. A single failure does not make replicated 
information inaccessible. Unf or tu n a tely, modifying repHcated information is much more 
difficult than modifying non-redtfndjwrtty stored information. 'M a great deal of research 
has gone into the development of protocols to update repttcated date, the problem remains 
difficult, and such updates are eestfy in that they require extensive communication between 
sites, reducing the ec o no m i c advantage of distribution. 

A second source of increased reliability, and one which I consider to be much more 
important, is the the failure of a single site or communication fink dees not necessarily make 
the entire system fail, while in a single, centralized system, the failure of a single component 
frequently interrupts all processing in progress. The individual sites hi a distributed 
information system wJW be smaller and simpler than a single huge computer system with 
storage and processing power equivalent to the total of that of the individual sites. This 
simplicity should mean that the sites in a distributed system foil less freeuentty than the 
singte machine of a centralised system. Thus if a distributed system *an be constructed so as 



-II- 

to limit the effects of a failure at one site to the interruption of processing that requires 
information at that site, the reliability of a distributed information system as seen by any 
individual user wilt be substantially better than that of a single shared machine. 

1.1.3 Economics 

A third reason for distribution is an economic advantage that makes a group of 
small computer systems less costly to manufacture than an "equivalent" single large machine. 
A single computer with a certain processing rate and storage capacity costs substantially more 
than a collection of smaller machines with the same aggregate processing rate and storage 
size. In addition to the computing hardware, communication and software development 
contribute to the cost of a distributed information system. Frequently, the information to be 
managed can be partitioned in such a way that mast of the processing o p e rat i o ns do not 
require information from more than one of the partitions. fEaeh partition can be assigned to 
a small computer system capable of performing the precesting required for the information 
in that partition. The cost of communication between star in such a system would be 
relatively small. If the extra cost of developing software for a diWilwted mformation system 
can be kept small, a distributed information system may be substantially teat costly than an 
equivalent central faciltty. 

1.1.4 Flexibility 

A fourth reason for distribution is flexibility. Changes in the amount of information 
to be managed by the system can require increasing or decreasing the storage and processing 
capacity. In a central ?y stem, this may require r^pfadng the entire machine with one of a 
different capacity. In the distributed system, capacity changes can frequently be 



-12- 
accompiished by adding or deleting sites, with minima) impact on the sites not being 
changed. 

Consider, for example, a corporation that has Just acquired a subsidiary, and needs to 
modify its administrative i nfo r ma t ion management system to manage the new subsidiary. 
Merging the mform&tion m anag e men t systems of the parent company and the subsidiary 
into a single central facility could be very difficult If the information management system 
being used by the corp o ration is distributed, however, the merger can be accomplished by 
adding one or more aMes to manage the subsidiary.' 



1.2 The Concurrency Control Problem in a Distributed Information System 

Several probl e ms most be overcome in order to make a distributed information 
system as easy to use as a central facststy. The subject of mis thesis* and what 1 behove to be 
the most difficult of these nrnhbiTii. is c o ntrollin g me seoeendne* of user sp e cifi e d UrocesstnsT 
operations. The result of p er fo rm ing such pr oce ssin g ope rati o n! osncurrently should be the 
same as that obtasnoi fry performing them in some seqpsemiai osder. Before this problem 
can be discussed aa detail, we wm«t have a mow precise dtDsmlsm of the way in which stored 
information can be manipulated, for this purpose, I adopt ttrmioology that has common ly 
been used in data base systems. 



1. In rare cases, the existing information systems of the parent and the subsidiary may be 
compatible, requiring virtually no effort for the merger. Even if the information 
management system of the iiibaijiiJ? most be udMtaotiatiy modiftwi to fit into the parent's 
distributed system, (Arts effort should be less than that required to merge bo* into a single 
shared facility. 



-13- 
The stored information consists of a set of individual data items, each of which 
represents some independently accessible piece of information. For each data Item there is a 
current value that is the information that that item currently contains. 1 A data base atate is 
a mapping from the set of items that makes up the data base to the set of values, specifying 
the current value of each item in the data base. 

The high-level operations that are to be performed on stored information are known 
as transactions. A transaction can be viewed as a functtai mapping one data base state to 
another. Each transaction is performed as a set of primitive opera ti on s, catted «om**«, on 
individual data items. Some accesses to an item cause the current value of that Item to be 
changed, and are known as updates. The set of items whose values are changed by $he 
transaction are the output items of the transaction. 2 The new values produced by the 
transaction for these items are known a* the output values of the transaction. Each 
transaction computes its output values based on the values of the items in the data base state 
that is the input to the transaction. The items that are used by the transaction in computing 
the output values are refered to as input items, and their values as supplied to the 
transactions are the input values of the transaction. 

The user of a distributed information system views each transaction as a simple, 
complete operation, such as "deposit ISO in account number 13648". Each transaction "sees*" 
the effects of previous transactions in the values that it obtains for its input items. A 
problem arises when several transactions are performed concurrently. Each transaction may 
see the effects of the others on the shared data items, in order to pceeerve the illusion that a 
transaction is a simple, complete operation, the tramacttont *aust be atomj& t« that each 



1. The term "version" has also been used for what I will refer to as a value 
[Reed78,Stearns76l 

2. This has also been refered to as the write set of the transaction [Bernstein77l 



-14- 
transactfem sees either ah or none of the ef&dx of each other transaction on the data items 
that *t accesses. The frfatoiun of atomic will be made wore precise in a tatter chapter. 

The prob l em of insuring that transactions which are ran concurrently ate atomic is 
known as concur rency co u freJ and is c ommo n to both di s tr ft utcd systems and to centralised 
data base systems, where transactions ate ran concurrently to increase the utitixation of 
resources. While there is a great deal of literature 09 this feneral problem, the particular 
characteristic* of a diiuibuipj jaJBtaaatian system aggravate g* problem of concurrency 
control, and make many of the mechanisms that have been developed to solve this problem 
in centralised data base tnanagemtat systems inappropriate for a distributed information 
system. 

u Basic Assumptions asm Weals 

Two co mmo n pr oblem * in evaluating a mechanism to solve a complex problem are 
understanding the goals of that mechanism and knowing the assumptions made about the 
effects of failures. Thts section sets Ibrth my own goals and assiimpttons, to ahow the reader 
to evaluate more precisely the mechanism pr opos e d here. These assumptions and goals may 
not he appropriate tor ah applksticeu, but i beheve that they am most apsmapriate for many 
uses of a distributed sufbnwation syattm as dea^ibed above. 

1.3.1 Implications of Delay 

A chataeanutic of distributed mformation systems is that communication between 
sites is slower, mom costly, and has lettable than? coauqimhiuton within a sue. An 



-15- 
impHcation of this characteristic is that unnecessary inter-site communication should be 
minimized, even if this requires more computation or more storage at each individual site. 1 

A second implication of communication delay t**harn©one site can readily obtain a 
view of the global state of all transactions in progress. State m^Miiiatitnfromremotesttts is 
delayed in communication and may be out of date The tack of global state information 
makes concurrency control schemes in which some decisiore ;(sucb as deadlock deoxtion and 
backup) are made based on global information awkward tor use in a distributed information 
system. Thus, ideally, the protocols used for pe t towing ti a nUction s should allow each site 
to base its actions on it* local state only, 

A third implication of delay is that any operation involving several sites may be 
delayed for a long period of time before it can be completed. This means that the 
information should be organized such that frequent or important operations can be 
accomplished locally at some site. While I wffl not dfcsettss the task of partitioning 
information in detail, I assume that the operations to be performed exhibit a high degree of 
locality of reference. Each operation require* only * small amount^ the total information 
available, and the information can be partitioned so that ,*etf rfew operations require 
information from two or more sites. 

This assumption is necessary to make a d is tribute d information system practical. It 
seems quite reasonable for many applications, including ma nage me nt information systems, 
process control, and personal computtng. 



1. I am not addressing the concept of a "mutti-mlcroprocessor" distributed system consisting 
of a large number of small processing and storage elements linked with very high bandwidth 
communication 



1,3.2 Partial Opw ah ility 

As noted above, the individual sites Mi a distributed information system should fail 
less often than a stag le ctwrakwd system of cew>»*nt p roces si n g power and storage 
capacity. If each sit* fat*** tatttrupts only those trantstttom whkh reoutre resources at the 
failed site, then ft transaction Involving only a mmWmmkmM she* should be less likely to 
be affected by a failure in a distributed tafa g fflfct tan system than It would be in a centmkwd 
system. Thus at a goal, the mutuants* lor performing tranatetieni should allow a group of 
sites *hat are functioning and can communicate wtth each other sa perftirm traosactiofu local 
to that group. 1 refer to this goal as partial oflerabikty> the most important aspect of 
partial operabitity is to allow any transaction that is entirely local to one of the sites to be 
performed whenever that site is operating and the request to perform the transaction can be 
communicated to that site. 

This is a very different form of enhanced reliability from that achieved with 
replication, as described by Atsberg et al lAbbergm t behove that the goal of partial 
operabHtty more accurately reflect* the ncedr of most applications. We shell see late* that 
both replication of data wfthbT one she and wpttcattort of data isem* at several sites fit 
naturally into the mechsnUm that I am propoiing. 

An implication of partial openbttity is that the dependence of one she on another to 
perform purely tocat transections must be roimmrttd. Pi*toeo*r reouitiog a site to receive 
external authorization to perftirm focal transactions, roach a* that Used by Thomas trt 
[Thomas76l, should be avoided. 



y%~»*£» 



-17- 
A more important Implication of partial operability is that error detection and 
recovery are concurrent with irie execution of transaction*, backward error recovery 
strategies [Randell78l whjch stop processing new transactions when an error is discovered 
and cause the data base slate to be "n^ s b^^ to be 

consistent, do not achieve the goal of partial operaNUQf. because processing continues 
during error recovery, a jite that encounter* an error can "get behind" in that i$ rnay not be 
aware of recent transactions. For example, a site hoWing a copy of » redundant data .jMMe 
may discover that the values that it hoWs are t^t of da^ because they do m^ reflect 
transactions that were performed on oth^r qo0j^..^^py^ r pm!^ mechan|sm must 
record any information sent to a site duriiw a failure of that site, so that the site can be 
brought up to date on recovery. ,. 

1.S.3 Autonomy 

As noted above, the autonomy of individual sites in a distributed information system 
is an important reason for choosing such a system over one with a central shared facility. 
One implication of autonomy consistent with the goal of partial operabfflty rs that individual 
sites should not be- dependent oh the system lr a whole in that they should be capable of 
performing local transactions when not in comrrwnkation with other sites. Thus we cannot 
assume that a site which is not in communication with any other sites stops all processing, as 
is done by SDD-1 [Bernstein771 

Another implication of autonomy is that each site controls the operations that can be 
performed or; the data items that it holds. Thus .pry site may refuse to perform some 
operation at any time. One method of dealing with this possibility is to require that each 
transaction obtain permission to perform all of its component operations before any of these 



-»- 

operations Is carried out This can substantially increase the cost of performing some 
transactions, by increasing the need for locking (see Chapter 4). 

For many transactions, the administrative policies of all of the titles that must 
cooperate are known in advance and examined in determining whether or not a site will 
cooperate in performing a particular transaction. Verifying that a transaction wiH not 
encounter access restrictions is similar in principle to verifying that a transaction preserves 
consistency constraints (i*. that if always maps one continent state to another). I will assume 
that even though the sites are autonomous, they wiH cooperate hi performing a large class of 
common transactions. Thus in many cases, the acceptability of a transaction to be run can be 
simply verified before it is run, and will net interfere wim synchronization. Dynamically 
changing access restrictions must be checked as a transaction is run, and wtt add to the cost 
of performing and synchronising transactions. 

1.4 Related Work 

The work of this thesis concentrates in two main areas: concurrency control in data 
base systems, and retiabHity techniques. I will discuss the previous research to these areas 
separately first, and then relate it to this thesis 

1.4.1 Concurrency Control 

Several papers (Sernsteui77,GrayB,Gray77 i Steams763 discuss the problem of 
controlling the concurrent execution of transactions so that each sees a consistent version of 
the data base. Gray et at. £Gray75] give definitions for four different levels of consistency 
and discuss locking strategies to achieve each. Atomic transactions as I have defined them 
maintain the highest level of consistency (level 3) defined In that paper. This is the level 



' ^v**"**'*. 



-19- 

that places the greatest constraints on concurrent execution of transactions. 1 The locking 
strategies presented by Gray are efficient, in that they allow the data base to be constructed 
so that a high degree of concurrency may be obtained with tittle locking overhead. 

A second paper by Cray [Gray77l discusses a mechanism for concurrency control in a 
distributed system that makes use of the tacking strategies described in the first paper. 
While this mechanism performs transaction* correctly unleu highly improbable failures 
occur, it fails to meet two of the goals outlined above. The locking strategy allows 
transactions to deadlock, requiring some mechanism to detect deadlock and abort one of the 
transactions involved in a deadlock in order < to allow the others to proceed. Deadlock 
detection requires a view of the global state of all tra n s ac tto n s in progress, violating the 
condition of making decisions based on local information. 

The two-phase commit protocol used by Gray and others insures that a transaction is 
atomic, no matter what failure occur during its execution; If a failure occurs at the wrong 
time, however, one or more of the sites involved in a transaction may .be obligated to hold 
onto locks set by the transaction until the failure is recovered, preventing the execution of 
transacttons local to that site that set kxks which conflict with those set by the transaction 
suspended by the failure. This violates our goal of partial operabllity. 



1. White the authors claim that forcing all transactions to see level or level 1 consistency 
allows transactions to be constructed fcrsee higher leveh oTconstttency, and may save locking 
overhead by allowing many transactions to run at the lower levels of consistency, they also 
point out that output values produced fey a transaction reflect W^ef'of consistency that 
that transaction saw. These low-level consistency v*Hip aw propagated by any. transaction 
that reads them, so that transactions desiring a high level of consistency can never read 
values produced by those observing a lower level Thus low level of consistency transactions 
would appear to have very limited use. 



-2D- 
A study %y -^mam *nd Hmwiowwa IStwwnsm] discusses a model for distributed 
data bases *n ittMi ** 4hm «w ipattitioned among site* and each transaction is performed 
by a process tfhat migrates among the sites that hold the values that the transaction accesses. 
Each site is responsible^ eoritrottmg the execution of transactiom at that site, and the sites 
communicate un% tfben m transacttwi *s Novell and <wben a transaction is completed. The 
authors describe a class *f -eewtrol algorithms that work by assigning an order to the 
transactions to be processed and use that order to rewive conflicts between processes 
attempting to access the same data, possibly by aborting ami restarting them. The necessity 
of restarting mw tnmsaettar. fhet has completed a substantial amount of processing is 
undesirable, but seems tmavtrtdabte to this model of concurrency control. Similarly, n*ie 
p«»octfc fliivelUBea^ 

Several papers Beiwstem?7fiam^ database system 

m which the set of transections *to%e pertormed on the data base is anatyxed to determine 
the amount ©T%cking needed. Bonsacttons are divided Into cbnoes'by the acts of items that 
they read and write, and transactions in the same class are performed serially with respect to 
each other. Transactions in different classes can be performed concurrently. The conflicts 
between the ua of Hems read and written by tWferehT comes are used to select 
synchroniiation protocols to be used to coordinate concurrent transactions from diHeront 
classes. Freouently, transactions can be run concurrently with little synchronisation overhead. 

The approach used in SBD-1 of pre-analvzing the set of expected transactions to 
minimize the sytKhronisation overhead for the most common f <victions seems to be very 
promising. The proof that this technique works, (i*. that alt trai< • 'ions are atomic), 
however, is *o long and complicated as to be unconvincing. Making SDIM robust in the 
event of failures also appears difficult. The synchronization protocols used freouently 



;: ^^*3»^»M. - 



-21- 
in vol ve waiting for messages that may be delayed by failures. The techniques used to insure 
that delayed messages do not cause excessive delay in the processing of transactions are 
extremely complicated, and may reduce some of the efficiency of this synchronization scheme 
by requiring additional message exchanges. 

The reliability goal of SDD-I is also somewhat different from that of this thesis. The 
goal in SDD-1 is to keep the system as a whole running, even if this means that sites that are 
separated from the network while involved in a transaction that spans several sites must 
stop. Thus SDD-1 does not achieve our goal of partial ooerabitity. 

1.4.2 Reliability 

The work in reliability is perhaps less developed than that on concurrency control. 
An important paper by Johnson and Thomas Ijohn^gpTS) describes an algorithm for 
updating redundantly stored data such that alt copies converge to the same final value. The 
paper uses the notion of a timestamp. which expresses the order In which updates should be 
performed, so that all copies converge to the same final value, even ,4C the updates, are 
delayed, duplicated, or arrive out of order. Timestamps have been used in many protocols 
for reliable synchronization. This paper does not discuss the problem of synchronization for 
concurrent updates. 

Thomas [Thomas76] proposed an extension of the ideas in that paper to provide 
synchronization. An algorithm was developed to allow updates to be performed as long as 
more than half of the sites were functioning. Tne algorithm is complex, and several flaws 
were found in the early versions. Another major problem with the Thomas algorithm is that 
it applies only to rases where the entire data base is stored at each site. 



-22- 
Ateberg and Day £Ahberg76] have developed a robust mufti-copy update algorithm 
with a somewhat different approach. They detonate one copy as the primary, and insist 
that all accesses occur through the primary copy. The other copies serve only as backups in 
case the primary fails. This strategy eliminates one of the major advantages of replication of 
data, that of greater concurrency in access. The algorithm. does, however, seem applicable 
where the only concern is greater reliability, and not greater concurrency. 

A forthcoming paper by Lampson and Sturgis CLampson76] presents a general 
discussion of performing atomic transactions in a distributed system. The paper presents a 
method of storing and updating information in a single machine, such that it is preserved 
and updated correctly even if crashes occur during updates. This storage technique is useful 
for implementing an atomic update within one site. 

The last part of that paper gives an algorithm for performing updates it several 
different sites atomfcaBy. A complicated protocol it Used to distribute the updated values to 
each site, such that during most of the procedure, each site can in de pe nd e nt ly decide to abort 
the update if messagw are stow in arriving. There H stiff, however, a time window in which 
a site must wait for the arrival of message from other another site, and cannot deckle 
whether or not to abort the update if such a message i* stow in arriving. This algorithm is 
similar to the two-phase commit protocol described by Gray EGrayTTJ and that used by Reed 
[Reed781 The Lampson and Sturgis algorithm makes the time window during which a site 
can not abandon a transaction interrupted by a failure quite small by insuring that all of the 
computation done by the transaction wiH be completed before any site is prevented from 
abandoning the transaction. This is accomplished via extra steps in the protocol and extra 
message exchanges. Chapter 5 discusses commit protocols in much greater detail, 



- : .,^:feviy£»m,' 



-23- 
Reed [Reed78] is also working in the area of robust synchronization mechanisms. He 
has developed a scheme in which each value assigned to an item can be named as a version 
of that item. The scheme allows a transaction to obuin a set of mutually consistent values 
for the items that it accesses by choosing the proper version nances. Thte scheme is subject 
to the same limitations as the Stearns and RosencranU scheme, in that a transaction may 
need to be aborted to avoid deadlock. This problem is solved by having all of the updates 
performed by a transaction (by creating ijew yeniom) .bf, conditional until the transaction 
has been completed. 

This same mechanism of conditional transactions is used to solve the atomic 
distributed update problem. The mechanism is simple and convincing, but stilt leaves a time 
window in which a failure can cause delay in procjpsj^g new >tj$rtsactlarjs. 

1.4.S Relationship of this Thesis to Previous Work 

This thesis preterits a model for distributed computing thai specifies the effects of 
failures on computation. The model is timilar to the Actors model of computaUon 
XHewitt76,Hewitt77l, The model describes corafmtatiw perfemed by an unreliable system, in 
which components can fail and failures effect l^ouj^omf ^ 

model. The thesis discusses implementation techniques that can be used to insure that the 
actual effects of failures conform to their effects as described in the model. The techniques 
used build on the work of Lampson and Sturgis [Lampson76] and Gray [GraytTl 

While much research has been done on the problems of synchronization in message 
based models of computation I>^uuon78^aUtead78>lewittJ71 much of this work has 
centered on developing primitive synchronUalion tec hniques that achieve mutual exclusion. 



-24- 
This thesis -shows tow 10 aunty such techniques (the atomic process steps of the model) to a 
more comnhcated - problem : amrduutting transactions. 

Most ©f the work ion antral of concurrent tiartsattions has been on mechanisms that 
allow transactions to rtnimwi, using some me ch a n i sm to detect a de a dhttr situation and 
abort one of the trail is i l imn to wcwlv c the deadlock and alow 3* others to proceed. The 
mechanism preieatod in tfc* tfaaate init^ ThU mechanism allows more 

concurrency &wa many athti) idaaeHacV avoiding synchroiiisalion schemes, by postponing Ute 
actual locking of a resource until it Is needed or most he locked to avoid deadlock with some 
conflicting transaction which needs mat resource. This approach avoids unnecessary locking 
that restrtctt amcurrancy. 

The mmfamOm used tor »y«t . h »miza t to o hi this thesis tt based on control of the 
order in which messages ane delivered, fnatfefnehtatians Of me control algorithm that make 
efficient use of the kinds of caroiram 

distributed i irfwrn atto o tya W m swe given. This approach as tjuhe MMfciOHt from and on be 
tubstantWiy feat emttj ihaft 'Use appro** taken by mast *f- the -*d*k :: *» ; s yi uhia i t nati o n .in 
distributed systems which imakes no ann inp tiom ahoot the cofomtmicatton network and 
i mplement s ~symihmiiiMuon constratats with higher hfvel pr otoco ls. 

The technique used to coordinate transactions involves an analysis of the access 
pattern of transactions that is simitar to that used hi SBIH tBernatein77l but more fine 
grained in that the actual derivation of each output of a transaction from the inputs to that 
trumctJon v lr*jBUBiB that 

every output uf a traasw^on depends on every moot, as n done m SMH. This analysis 
shows %dw to structure the ly ii th rvti UAt ton schmw » tr»at freouemw iinportant transactions 
can be performed with minimal overhead due to the synchronization. 



-SKa.Mi>»W r *v 



-25- 
The thesis includes a proof that it is impossible to solve the "atomic distributed 
update" problem for all cases in a way that achieves the goal of partial operability, given the 
semantics of the model presented here. The proof applies arguments advanced by CGrav77] 
and [Akkoyunlu?^ to the model of distributed computing presented in the thesis. 

A novel approach to the atomic distributed update problem is presented. This 
approach involves keeping several current values for some data items, and builds on the 
version naming synchronization schemes of Reed EReed78] and Stearns et al. [Stearns76l 
This approach is not limited to the particular synchronization scheme discussed in this 
thesis, but is applicable to any of the synchronization, Khemes discussed above. 

To summarize, I feel that the important contributions of this thesis are: 

A model for distributed computing in which the effects of failures 
are well specified and implementation techniques for meeting these 
■;'■■ specifications . ■ •-> ■ ' : ■..-■■■ bH: -r-. 

A technique for coordinating what I refer to as an "atomic 
broadcast" that can be implemented efficiently in the kinds of 
computer networks dirt-entry usfed to cMSett sites in distributed 
information, systems , 

A technique for analyzing a set of transactions to be performed to 
determine which ones can be performed without locking 

A mechanism for locking data items at several sites in order to 
perform a distributed atomic update without allowing a failure to 
delay access to the locked data indefinitely, in most i 



-26- 
1.5 thesis Plan 

Chapter i -pselti kk pT&M molet if ii*rxft%id cornp^tihf rtiat is used 
throughout the thesis. Te^riiqliei for Implementing i distributed system that behaves is 
specified b? t-he* p'reHIs Mi ire discussed, f Hi probfcm of iynChrbrrizirig transactions is 
formulated in terhis ot triiii r$jdei the charier dUcusses several ifSys Mi which the order of 
execution of trinstctiehi can be controlled, and sbdws thst only one of these achieves the 
goal of partial operablltty. 

' CMpiir i UmiM i'tiffiple sfhchrbtiizalibri £ro%lei : iMt. consist! ot co%di»iiting 
what I refer tb as in atomic HHJeJit. AH atomic broadcast distributes* set ot message*" to 
a set of receivers such that the order in which any one receiver sees messages frorrt several 
such broadcasts is consistent with the order iH Which the broaoMlts ire received by any 
other receiver. A sim|i mechariiirrt to perform th it taifc ft presented, this mechanism 
forms the basis of tiie lyKcftrbntzitibn" mechanism for concurrent trtnsacttem discussed in 
Chapter 4. Implementations ot this mechanism that take advantage of the synchronization 
constraints imposed by tne communication network, are discussed, these implementations 
distribute the messages with very little overhead attributable ' l§ ' ttii ''enforcement of 
synchronization constraints. 

Chapter 4 discusses the problem ot synchronizing transactions. A technique for 
analyzing a set of transactions to determine what synchronization protocols are needed is 
discussed, this analysis is used to show that correct synchronization of all transactions 
cannot be accomplished with a protocol that achieves the goat <i partial operabltity. Three 
different classes of transactions ire distinguished, on the basis ot their access patterns. A 
mechanism that builds bli the atdmic broadcast mechanism of Chapter 3 is presented to 
perform transactions, this mechanism can be tailored to miliimize the cost of Synchronizing 



■**»^?**t ■■■■■• i$RWMF****< 



-27- 
transactions that are expected to be performed frequently. The mechanism is general, 
however, in that any transaction, expected or unexpected, will be correctly synchronized. 
Unexpected transactions have little impact on the efficient operation of the synchronization 
mechanism for the expected transactions. 

Chapter 5 considers the implications of the need for locking on the goals of partial 
operability and autonomy- These goals dictate that a site that has set a lock for some 
transaction should be able to decide to abort that transaction if a failure interferes with the 
prompt completion of the transaction or if the transaction violates the access policy of the 
site. I show that there is no protocol that can be used to insure that no failure can prevent a 
functioning site from promptly completing or aborting a transaction requiring locking. 

As a solution to this problem, I propose a novel mechanism that allows locked data 
items to be made available to other transactions before the completion or abortion of the 
locking transaction is decided. This, mechanism is appropriate for systems in which the 
ability to perform transactions in real time, without long delays waiting for locks to be 
released, is important. 

Chapter 6 presents a comprehensive example showing how to apply the techniques of 
this thesis to a typical distributed information system. The example is an inventory control 
system described in a report on SDD-1 [Bernstein77j The techniques of this thesis are used 
to develop a robust synchronization scheme for this example with little overhead due to the 
synchronization. 

Chapter 7 summarizes the new ideas in the thesis and discusses areas for future 
research. 



28 



-29- 

Chapter 2 

The Prootig Model of Diatrlbated Competing 



This chapter presents the model for distributed computing that will be used in 
discussing synchronization in a distributed information system. The first section presents the 
model which includes specifications of the effects of failures on computation expressed in the 
model. Implementation strategies 1br limiting the impact of actual failures to the to the 
failure effects specified in the model are discussed. The second section poses the problem of 
performing transactions (as described in Chapter I) atomfcally ifi the framework of the 
model. Various techniques that could be used for synchronization are discussed to show that 
only one of these can be used by a system that achtevw the gpal of partial operability. 

2.1 The Model 

Based on the assumptions and goals set forth in the previous chapter, I will now 
describe a model for computation in a distributed information system. In order to centralize 
the description, this chapter presents all of the model, even though some of the concepts will 
not be used until much later in the thesis. This model includes two forms of communication: 
message passing, and changes in state observable by later computations. Message passing 
may occur between sites or within one site. Communication through state changes, however, 
occurs only within a single site. 



-30- 
2.1.1 Definitions: 

The bask unit of the model is a process. 1 A process can be viewed as the unit 
within which communication through state changes can occur. A process consists of a local 
state, a set of Input poru, and a set of process Hffi specifications. The computation 
performed by a process takes place in a series of process steos. A process step maps an 
input local state and a set of input messages into an output local slate and a set of output 
messages, Each process step specif katton specifies the form of a process step, fey stating: 

A set of input ports for the step. One message is received by the 
step from each port in this set 

The output local state as a function of the input local state and the 
messages receivea. 

A set of output messages and their destination ports. Both the 
messaee contents and destination sects must be joecificd -as 
functions of tr« ir^t local state and the rnessages rtcdved. 

An important point to note about a process step is that It computes its output 
messages and output local state. Thus a single process step can be used to perform 
computation on the local stale of the process and the messages received, rather than simply 
retrieving information from the local state or storing information in fine local state hi 
response to messages. This capability of a process step to perform computation is used in the 
implementation of a transaction, as will be discussed in Chapter 4. 



1. The word process has been used to denote a number of ill specified concepts in the 
literature. My use of the term process is not inconsistent with the common usage of the term, 
however the reader should realize that the term has a very specific meaning in this thesis. 
Other terms that have been used for very similar concepts are Actor CHewktT&l and message 
handler £Reed?8l 



: --'^si!^ napysiss^,- 



-31- 
Conceptually, each process resides at one site, its home site. The home site of a 
process is the location of the process state ..of a process* The home Site also is responsible for 
carrying out process steps. The fact that each process is implemented at a single site will be 
used in determining the effects of failures on the execution of process steps in this model. 

Each of the process steps of a process is atomic with respect to the other steps of that 
process. The output local state of one process step becomes the input local state of the next 
step in the sequence. The execution history of a process consists of the sequence of steps 
that have been performed by that process. For each process p, there is an ordering <p on 
the steps of p, such that sj <* s% if S| preceded sg in the execution history of p. 

The set of messages that a process has received in each of its process steps and the 
initial local state of the process form a complete description of its execution history. From 
the messages received at each step and the process step specifications, one can deduce the 
messages that are produced and the changes made to the process state. The input message* 
to each step can be represented by a set I of Dmessage,port] pairs describing the messages 
received and the ports at which they were received. 

Figure 2.1 shows an example of an execution history. The figure shows a list 
describing the input messages to the process steps of P. The first process step of P is 
represented by the bottom entry in the list, with subsequent process steps higher in the list 
This list may be thought of as a log that records the messages received by P. When a 
process receives messages at a single port only, the execution history can be represented by a 
list of messages received, as each step receives a single message at that port 



-32- 
listeryof a 



{tn+4} 







The local state of a process & private to that process and can onb/ be chafed by 
process steps of that process. Sirorarh/, for each port, there Is a single process that receives 
the messages sent to that port 1 The lack of "sharing* of pods or local state between 
processes greatly simph fl es the implementati on of processes in a distributed system. 



"Oie characteristics of a process stated above Csossjensssi OKOGHtton of ; 
each process at one home site, and no sharing of process state) describe the behavior that a 
process must exhibit, not to i mplementat ion. In practice sny impkroentation that behaves as 
described above is satisfactory . In an feaptementatton of processes, for example, process steps 
may be executed comatrrendy so long as the behavior ts the same as that p ro duc e d by 
sequential execution. 



1. Note; however, that several processes msy send messages to the same port 



-33- 
One can view the execution of a process as being performed by an interpreter that 
carries out the execution of all of the processes in a system. This interpreter maintains a 
local state for each process and a set of messages tor each input port One cycle of this 
interpreter selects a process step specification of some proceu, selects a message from each of 
the Input ports for that step, and carries out the selected step. The interpreter deletes the 
received messages from $e sets of messages for the input ports, changes the local state of the 
process, and adds any output messages produced to the sett of pending messages for the 
appropriate ports. 

The interpretation can be distributed (one interpreter |pr each process) because the 
only interaction between steps of one process and stefs of some other proceu If the sending 
of output messages produced by process to Jn^ttt ports of some other process. ThU 
interaction can easily be accomplished by message passing between the distributed 
interpreters. 

2.1.2 Effects of Failures in the Model 

The process step specifications completely specific? any computation taking place in 
the absence of failures of the underlying mechanism ^'tti^WliWiptocftt steps. ThU 
section discusses the kinds of failures that can arise in a distributed information system and 
their effect oh the execution of process steps and on message passing between processes. Two 
extensions of the process model to include a specification of the effects of failures on 
computation are presented. 



-34- 

Two different kind* of failures can occur m a distributed information system: site 
failure*, and con*munteatiou faiwros. A site failure U locat to some rite, and can cause 
processing at that sin to be suspended, or mute mfbrmttion stored it that site id be to* or 
damaged. A communication failure causes HHe r -pr oesw messages to be lost, delayed, 
damaged, or delivered to the wrong recipient Many IniphHwntirirti tacrmfcjues can be used 
to hide the effects of fattum from a user i 



An error detecting cede, or checksum, can be iised to detect messages that have been 
damaged or delivered to the wrong recipient White * * tmpeestbw to mm *» mch errors 1 
the probably of undetected communication errors call he > * de arbitrarily small by 
increasing the proportien of each metsag* devoted te error detatttan t wot therefore make 
the assumption that eX co m municat ion errors can be detected and henceforth Ignore the 
arbitrarily small, but non-zero probability of an u ndetected error, if any message that is 
found to be m enw i§ dstcaJded, to 

w*^w» *^i %oj^&sj^a y^wn ei^vme^euftj^^p* 

The most common effect of a site failure is that computation at that site is 
temporary suspended, and that seme of die computatioo » progrew at the time of the 
failure may be test U a §#* failure were to occur during the eaecudon of af»*>ceat stop, that 



1. No matter what kind of error detecting code is used, there U a chance that a 
communication failure wdl cause a message to be transformed m*hat it appears to be amy act ■ 
to the error detect i o n jne chani s m butdoat not cermspond to the original me— gc. 

2. Many communication systems exhibit another failure mode in which a message is 
duplicated. In dssjgaing a ammmka Mm pretax* one has a choice as to whether to 
guarantee that *H manages am debvemd reHebty, possjbty delivering some twice, or to 
guarantee that each manage anfcmj at meat once, and that eeme m e img e j may be lost, I 
have chosen the tetter ttarmtive. hi die next malm, I introduce the concept of robust 
sequenced communication which hides die effects of both of these kted J of faUures. 



-35- 
step might be left partially completed, with the local state of the process corresponding to 
neither the input state nor the output state of that step. This can be prevented by using a 
robust storage management technique for storing the local state of a process. Such a 
technique allows a group of updates to be made atomteaHy to information stored at one stte, 
«uch that if a failure occurs either all or none of the updates tacke place, The atomic stable 
storage mechanism of Lampson and Sturgis [Lamp$on76] is such a technique. A description 
of all of the updates to be performed, known as an intentions list, is formed and written to 
permanent storage in a single operation before any of the updates are carried out A failure 
occuring before the intentions Hst is generated or one inteffniHg with the writrtng out of the 
intentions list causes none of the updates to be performed. Once the intentions list has been 
written, however, the error recovery mechaniirn can use it to insure that all' of the updates 
specified will be made, even if the site making the updates fatls after having partially 
completed them. The write-ahead-tog protocol of Graf COrlyTH also provides the same 
capability for making a collection of updates atomfcally, by writing out a description of the 
updates to be made to a log tape before any of the updates are made. 

Each process step can be implemented as an atomic update to stable storage. This 
implementation insures that a site failure leaves the local state of a process executing a 
process state either at the input state to that step or the output state of that step, and not 
some intermediate state or mixture Of the two. 

2.1.2.2 Two Ways to Include Failure Effects in the Process Model 

By using the low level implementation techniques discussed above, one can constrain 
the way in which failures affect execution of processes. By augmenting the definitions of the 
process model to include specifications of the effects of failures, we can produce a model that 
describes computations in a "rear distributed i nf or m atio n system m which site and 



-36- 

communication failures am occur. The choice of the specifications of the effects of failures 
should he made so as to reduce the effects of actual failure* on the model, but also to he sure 
that an implementation of processes In which the effects of failures are limited to the 
specifications can be obtained. I consider two different failure specifications, one that is easy 
to implement and one that is harder to implement but limits the effectt of failures more 
severely. 

2.1.2.2.1 Simple Processes 

Using basically the technique* described above, one can buHd an i mpl e m entation of 
processes in which the effect* of a site or communication failure are limited to lost or delayed 
messages. This is done by using error detecting code* to detect communication errors and 
discard messages that are in error, and storing the local state of each process in atomic stable 
storage. Seme care mutt be taken in the implementation of a process step to insure that no 
possible failure causes message* to be apparently duplicated, by causing some part of a 
process step to be repeated. If a process step is restarted, after being partiaJiy completed, 
then it may send out the same message twice (once before being restarted and once after), or 
may modify its local state as if tt had received die same message twice. 

These undesirable effects can be avoided by performing a process step in three 
stages. First, delete any record of the input messages to the step so that a site failure 
occuring at this point would cause them to be lost Then, perform the process step and 
update the local state of the process to reflect its completion. Finally, distribute the output 
messages of the process step to their destination ports. A site failure occuring before the local 
state of the process is updated can result in the process step not being performed, or an 
apparent toss of all of the input messages to that step. A failure after this point may cause 
output messages of the step to be lest No failure causes the local state of a process to be 



•'"vsw^RESij&sjr •■■ ismm****'*®*': 



-37- 
modified as if a process step were performed twice, or causes the messages produced by a 
process step to appear to be duplicated. 

A less likely result of a site failure is thin the information stored at a site in 
permanent stable storage is damaged. This can be detected, with high probability, through 
the use of error detecting codes. As with communications failures, however, it is impossible 
to detect all such errors. The local state of a process can be replicated within one site to 
decrease the probability that a failure wiH destroy all copies. A process step^tt implemented 
as an atomic update to all of the copies of the process state. Any copy of the local state of a 
process that survives a site failure can thut be used to become the current local state of the 
process. 

To summarize, the effects of a site failure can be limited to lost messages (through a 
process step that was aborted after receiving messages), or delay of processes at that site. 
This is achieved by using atomic stable storage to represent the local suite of processes, 
replicating local states, using error an error detecting code to detect damage to a local state, 
and indefinitely suspending any process for which no valid local state can be found. 

Limiting the effects of failures to lost messages or delayed execution can easily be 
achieved without excessive communication or processing overhead. Many applications 
require a higher degree of reliability. In the next section, I discuss a different 
implementation of processes that gives a greater degree of reliability with greater overhead. 



-38- 

2.1.2.2.2 Robust 



The effects of failures on simple processes are weH specified, but still undesirable for 
most applications. For many applications, guaranteed delivery of alt messages sent by a 
process to a port is desirable. Tttts » a very difficult constraint to express, a* ind e fi n ite -detay 
of the delivery of any particular message by a coramuofcation failure cannot be prevented. 
In order to clarify what I mean by guaranteed delivery,. 1 wiK introduce a constraint that I 
refer to as se onencluf on the delivery of messages. . S ea u e nc i n g hnpnes that messages $mt 
from one process p to % port a, are received at q in the same order in which they were sent 
by p: Robust sequencing impbes in addition thai no messages are lost 

For each port q define the ordering <_ on the messages received at port q to be the 
total order in which these messages, were received. For each process p the ordering <p on 
the process steps of p describes the order of occurrence of those steps. What I mean by 
robust and sequenced message delivery is that for any process p lhat sends messages to port 
q, the order <. in which the messages sent by f are received at q is exactly the same as the 
order in which the steps that produced those messages are ordered ,. by <# This means not 
only that the messages are received at q in the same order in which they were produced, but 
also that there are no gaps in the sequence of messages received. Reception of message m 
sent by p to q can only occur after reception of any message m' for which m r <* m. 

Robust sequenced processes can be implemented by separating the execution of 
process steps from the actual communication of messages from one site to another. This can 
be done by maintaining a process database for each process. The process database for a 
process p contains the local state of p, an input message queue for each input port to p, and 
an output message queue for each port to which p has sent a message Each output 
message queue contains a list of messages and a transmit se quence number (TSN). The 



-39- 
input message queue for a port q contains a list of Messages and a set of receive sequence 
numbers (RSNs), one for each process that has sent a message to q< : A process database is 
stored using atomic stable storage, so that a site falture durtny modiftott feii does not cause a 
process database to be left in some intermediate state. 

A process step of ft can now be imptememed as an atomic update to the process 
database of ft, which removes ihe messages received by that s*ep from the irtput message 
queues, changes the local state of p, and appends the messages produ ced by that step to the 
output message queues, (if there is no queue for tome desttnttten port, a new one U created). 

Messages can be transfered from an output message queue of a process p for a 
destination port q to the input message queue for q with a robust communication protocol 
using the sequence numbers RSN and TSN. BrieHy, each stte pertoti*ea«y attempt* to send 
the first message in any non-empty output queue, attaching the TSN of that queue to the 
message sent. When the site holding port q receives a message sent from p, it verifies that 
the sequence number attached to that message is equal to the RSN of q for p, and if so 
updates the process database of the process associated with port q to add the message 
received to the end of the input queue Hot q, and to increment the RSN of port q for p. 
Whether or not trie sequence number of the message received to correct, the receiving site 
sends, an acknowledgement to; the site holding p containing the RSN of q for p. This 
acknowledgement informs the sender of die most recently . received message. . The 
acknowledgement either acknowledges receipt of m message jst informs the sender that 
retransmission of some message may be required. When the site holding p receives such an 
acknowledgement, it verifies that the sequence number is the acknowledgement is the same 
as the TSN of the message queue for q in the process database of ft, and if so deletes the 
first message in that queue and Increments the TSN. 



-#- 

i wffl net at (Ms point explain hw the msiiage queues are initially mi up when two 
processes first begm to c e m m unkste with each ether* ,&M| it somewhat complicated and wffi 
be discussed at length i* Chapter J> where a use for nbm *equer«d processes M discussed. 

This implementation of processes guarantees delivery of titter-process messages in 
sequence. The cost of the protocol is the extra wmaei <a«kwowledyemewti) osed, and the 
storage required fee the mii i iagti queuei and sequence num be rs. This out is small if each 
process converses wtth setocively few proceitei* and tf miianajsi in the output queue* are 
promptly forwaidedl b» the sritcfc*eiriiatk>A protocols umI m tfc* thesfev each process 
converses directly with relatively few other processes, shot the cost of robust communication 
is small. 

2.1.3 A Ju*iiffcatkm for TM* Mode) 

A number of models have been proposed for distributed computing. I feet that the 
model described above best reflect* the goals and assumption! of the kind of distributed 
information system discussed in Chapter I. 

One semantic modei th»t ha* been proposed tor dtot r i b tttt U computing H me object 
model [Liskov77,Saftxer78i hi the abject model, Inf o M nano n m repre s ented by rypetf objects. 
For each type of object, there it a set of openttiens, soth ** wM Mma, muMpfy, and 
divide for integer objects, which can be used to rnanipubte e*jet» ©f that type. In tddlrion 
to primitive object* sach a* mteger* or booU«m r MTtf user may defme a new type" of ohjectt, 



L. Any site that doe* tuMwi^ 

refuse to execute a process step for a process with non-empty output queues, and can refuse 
to acknowledge a message sent to a port with *' nbi^ei^ eWetw. "f hew measures do, 
however, introduce the possibility of deadlock if ' flnj 
proc e sses. 




-41- 
describing the operations that can be performed on objects of the new type and a 
representation for objects of the new type. Computation is performed as sequences of 
operations on the sets Of objects. 

While the object model is a very natural one for m^ny jsers, several problems arise 
in the application of the object model to distributed computing. The most serious of these 
problems is that it is unclear what the appropriate semantics for accessing remotely managed 
objects should be. Many suggestions have been made, including treating all object references 
uniformly, whether local or remote, treating references to remote objects specially and 
maintaining a local copy of the remote object; and disallowing reference* to remote objects, 
and instead using memge oriented communication between sites. The first of these 
suggestions is difficult to implement, while the others viobrte the conceptual simplicity of the 
object model. 

The uniform object model (in which a user computation does not distinguish 
between references to local and to remote objects) is difficult to implement reliably. 
Operations that involve objects at different s1u»s can faul in different ways (due to the 
possibility of communication failures) than operations on objects all at one site. Hiding the 
different failure modes from the user is difficult or impossible, forcing the user to deal with 
the problem of determining what the outcome of a sequence of operations on objects will be 
if failures interfere with their normal completion. 

Several similar semantic models based on message passing have been developed for 
distributed computing. These include Actors [Hewitt76l the M-cakulus [HaUtead78l and 
data flow [Dennis75l These models in their pure f«rm all describe computation such that 
the only communication between primitive computation events is through explicit message 
passing. 



-«- 

The Actor* m o del provide* a uniform way of deicrtbmg any computational activity 
as a group of wwte each event being the reception of a message by, aft Actor. Out 
problem with the Actors model t§ that exactly which Actors ant primitive, implementing their 
effects directly rather thaw sending message* to other Actors to achieve ther effects, is left 
unspecified. Thus ft is so me times awkward to deal with the Actors model, as Mere is always 
another level of description Tbefsw* any level that you cheeee. 

Side effects are introduced into the mode* with the notion of a celi an Actor that 
remembers one of the messages that it has been sent and repeats that message on request 
This primitive mechanism can be used for modeling cotnputation in whfehthe processing, to 
be applied to some message is not known in advance »^ U dependent on some ftiture event, 
such as storing a data item for toef transsctions. Geto are flso used to implement events in 
which two or more m e ss a ges are logically "received* (by using cells to store messages), as the 
Actors model does not allow an Actor to receive two or more messages in a single event. 

The p-cakuhis is similar in principle to the actors model. It, however, provides a 
mechanism for mtrodocmg primitive functions that are not implemented by message passing. 
Cells are used in this model as weh\ for storing values for later use. bt addition, a 
mechanism called a tefceav is introduced to provide a way for a pair of messages to be 
received in one event While the token mechanism is more straight-forward than using a 
cell, it is still rather hard to understand. Moreover, me imptementation of a distributed 
system based on the jt-caktrius seems difficult (and in fact the protocols presented for one 
such imptementation EH«ttW»d7fl are very complex), particularly tot frnpterotnttrrg celts and 
tokens. 



-43- 
Data flow schemas have frequently been used as a tool for describing repetitive 
processing, such as computing a Fourier transform. A data flow schema provides a natural 
mechanism for events in which two or more message) are received, unlike the above models. 
Unfortunately, computations in which the processing to be applied to some message depends 
highly on the contents of the message are hard to describe in data flow. Recursion and 
iteration are somewhat difficult to express naturally, and greatly add to the difficulty of 
implementation. A data flow description of computation where a lot of information is stored 
for later (unknown) use, such as a data management system, is awkward. 

The process model previously described is an attempt to bring together some of the 
good features of the models described above, without the disadvantages. The two different 
forms of communication provided in the process model represent the properties of 
communication in a distributed system better than either observation of state changes or 
message passing alone. It is easy to specify the effects of a failure in a system based on 
processes, and to build an implementation of processes that meets the specifications. 
Distinguishing between intra-process and inter-process communication encourages the user to 
plan his application carefully so as to minimize unnecessary communication between sites, 
and to plan for site or communications failures. 

The process model also captures the concept of autonomy. . Alt stored data is 
represented by the local states of the processes. No process can be 'coerced" into performing 
some function for any other process. All access to stored inf or mat io n ft mediated by some 
process that can implement its own access control policy. This allows the problem of access 
control to be largely ignored in the model, as each process can provide its, own access control 
policy. At the same time, the process step specifications of a process specify the access control 
policy of that process by stating what the process does in response to the messages that it 



- 44 - 
receives. TTHttis * tuser *impTs«erttiwg some *$fflcatlon can examine the process step 
»p«cificartoos of pnswMKipwvidiiig services that lie withes to use and can in many cases 
determine whether ornot access restrictions will be encountered In hit application. 

itts© inherent im the process maM is the notion that some processing activities can 
be performed simply by owe lite. Although each process hes«n wwitt specification of its 
operation, in a reel ssyJtemmfostipreeejses will be implemented from smaHer pieces. I will not 
specify what those fpteees are, as the itnplementltioh of a process cotiM be based on a 
message passing system, a conventional programming language, or the object model, 
depending on what fe'4*eemed most convenient Within one process, however, one need not 
deal with the special ^problems of a distributed information system, as each process is 
executed solely at one sice. 

The mechanism used to specify a processing event that logically receives messagts 
from two ?or more sources (multiple {ports) seems much more natural in the.^rbeeis model 
than the mechanisms using cells or tokens. fcs events In which two or more messages are 
received are common in many applications and can be conttructed from the primitives in the 
Actors or fi-ca)cu\m models, there leems to be no reason not to include this important special 
case in the model. Inclusion of this capaW^ 

the cases where two messages aw being received by what is, JqgicaHy one, process *tep. This 
makes it simpler to construct an efficient and robust ttnofcwiif^ 
receive were simutated usli^ some more general mechanism. 

The association of several independently named pom with One process is a very 
useful feature of the process moctel. It can be used to group several independent processing 
activities that wish to communicate via a shared data base in a single process. Such 
processing activities can be implemented as independent process step speculations of the 



-45- 
same process, each of which receives its input messages through a different set of input ports. 
This use of processes is similar to a monitor CHoarf 74J or a critical section. 

A second, more important feature of ports is that they provide a way to classify the 
messages sent to a process before messages are received. One application of this capability 
would be a process with several queues of pending messages that are serviced with some 
priority algorithm, not necessarily in the order in which the messages arrived. Ports ateo 
allow a process to temporarily ignore one class of messages while exchanging messages with 
other processes to complete some processing acSvity. This use of ports will be demonstrated 
by the locking strategy discussed in Chapter 4. 

The differences between my model and the others are a reflection of different goals. 
My model is an attempt to provide a way to express applications for a distributed 
information system clearly, such that the effects of failures are well specified. Others have 
been more concerned with formality and minimization of the primitive concepts. 

2.2 Atomic Trantactions. Revisited 

This section examines the problem of performing transactions atomically as expressed 
in the framework of the process model. We show how the simple definitions of transactions 
given in Chapter 1 can be stated in terms of the process model and show how to express the 
property that transactions are performed atomically as constraints on the order of execution 
of process steps. Several mechanisms that could be used to control this order of execution to 
achieve atomic transactions are discussed. 



-46- 
2.2.1 Expressing Transaction* In tkc Process Model 

Recall that a transaction is a set of accesses to stored data Items. The definitions of a 
data management system given in the previous chapter can be mapped onto the process 
model by using several processes (at least one for each site) which I refer to as data 
managers. Each dab manager maintains some of the data items M components of its local 
process state. The process steps of a data manager perform the accesses to the data items 
held by that manager. If a data item is replicated, with several sites having copies, then 
several data manager processes maintain copies of thai Hem. 

A transaction in the process model consists of a set of process steps of the data 
manager processes which together carry out the accesses needed to perform the transaction. 
Each data manager may perform several steps in carrying out a jingle transaction. If 
communication between managers is required to perform a transacapn^ then the outpu 
messages of some of the processes steps that perform that transaction will be used as Input 
messages in some of the other process steps performing the same transaction. 

In addition to the data manager processes, which implement accesses to data items, 
there are transaction processes, which perform the function of translating from a high level 
description of the transaction to a set of messages to be sent to the data managers. These 
messages direct the data managers to perform the necessary accesses to carry out the 
transactions. A more detailed description of the function of the data managers and 
transaction processes is given in Chapter 4, which discusses mechanisms for performing 
transactions. 



- 47 - 
2.2.2 Performing transactions Atomicaliy 

Intuitively, a transaction is atomic lt,4^,,i$ l Qt-i!^jtfjib-!4fau are visible to 
other transactions. There are two ways in which one transaction may observe the effects of 
other transactions: messages sent by steps of one transaction that are received by steps of 
another transaction, and the modifications of local process states that are made by steps of 
one transaction and later observed by steps of another transaction. ,. /;> 

The first method of observation, direct message passing, rarely occurs. This is 
because a transaction is a complete, uidependent processing acUvity and docs not in general 
communicate directly with other transactions. Th? exception tcj this case is that the user who 
submits a transaction, (by sending a message into the distributed information system) may 
Itnow of other transactions by having received messages sent from other transactions. 
Controlling sequencing of transactions so that the order of transactions as perceived from 
explicit message' passing is consistent with their order as perceived from observations of 
modifications to local state is relatively simple. For the moment, I will present a definition of 
atomic transactions that ignores this method of observing ordering. Chapter 3 discusses this 
problem further. 

The second source of communication between transactions, the local process states, is 
much more important in most applications. Recatt that for each process p there is an 
ordering relationship <p that defines the relative order of occurrence of process steps of p. 
These local ordering relationships can be used to define an ordering of the transactions as 
follows: 



Transaction Tj < Tj iff there is a process p and process steps Sj and 
»2 of p such that s { is a part of Tj, and Sj is a part of T 2 and S| <* 

*2 



-48- 
Thus two transactions are ordered if both contain steps of the same process. This ordering 
is a reflection of which transactions may have directly observed effects of which other 
transaction*. A transaction Tj can also observe the effect* of some transaction T s indirectly 
if there it some transection Tj such that T s < T 2 {because of the wder of process steps of 
some process and Tg < T, (becaus* of the ordeY of process steps of some other process e. 
Indirect observation caw oeeor because the effeco tf a transaction may depend on the values 
that that transaction saw. 

The condition that we require for a transaction to be atomic is that either all or none 
of its effects be reflected in the data vahwj «*«n by J o1hW ; trlitt»^tteitr''i^ a group of 
transactions is perfo rm ed atomteatfy, then the* 'effect* of those transaction* (modifications 
made to the values of data items and memoes prodocW by the transMctioni) arc the same as 
rf the transactions were performed wriaHy in some sequence, with each transaction being 
entirety completed before the neat transaction fti th¥ seouence is begun. This retirement 
can be expressed as a condition on the < ^o^Wrmf resulting "ftem Ok execution of 
transactions as follows: ' T- ■■^>'^- ^s ,? ■-•-• 

Transaction t is atomic with respect to a set of transactions ,.T if 
there is no sequence of transactions tj, .... t n in T such that t| < t|^| 
for I 1 i < n, and t n < t < t|. Eouivatentry, a set of transactions is 

atomic if the transitive cbsur* 1 of the < ordering, <* 1$ a* partial 
order on that set of transactions. 

In order to insure that a set of transactions is performed atomically, we must insure 
that the < ordering resulting firom any concurrent execution of those transactions is cycle free. 
One way to insure this is by choosing the assignment of data items to data managers such 



1. Throughout this thesis, I will use the spperscfJfH * to detbjrotfe a reflexive transitive 
closure (i.e. x <* x for any x). 



-49- 
that for each transaction there is a single data manager that can perform that transaction. 
Thus each transaction is seen by only one data manager, and j» cycles in the < ordering can 
■arise. . 



This approach can be rejected because the assignment of data items to managers is 
not solely under control of the system designer. The autonomy of individual sites dictates 
that certain items must be managed by processes at certain sites. Some transactions may need 
to access data items from several different sites. Because each process must be executed at 
one site, there is no way to have one daU iBanager precess perforin a transaction at several 
sites. 1 ' .•-.- ■ v. ..■■'■ ■ 



Perhaps a more serious objection to this proposal is that lit makes the addition of new 
transactions, which access items in patterns that were not planned, difficult or impossible. 
Adding a new transaction may require complete redesign of tike system so as to alio* Mew 
transaction to be performed by a single process. 

We therefore must show how to coordinate transactions that involve process steps 
from several different processes. This can be accomplished by controlling the order in which 
the data managers perform the process steps which perform accesses of transactions. The 
next section discusses four primitive mechanisms that could be used in coordinating the 
process steps of the data managers. 



1. Recall that the specification of the effects of failures on the execution of a process was 
greatly simplified by the fact that each process Is executed at one site Therefore, we do not 
wish to abandon this assumption. 



-50- 
Ut.$ Primitive Syachronisfttiea Mechanisms in the Process Model 

There are seven! mechanism* avails** m the process model that could be used to 
constrain the order in which processing operation* are performed by processes. These 
mechanisms could be used to construct a solution to the problem of performing transactions 
atomicalh;. in much the same way as mechanten* such as Semaphores [DijkstraSS] or 
Monitors WoareT*} are frequently used to construct strtutkms to other synchronization 
problems. 

To achieve the goal of partial operabiltty, the synchronisation scheme for 
transactions must allow a transection that is purely local to one data manager to be 
performed whenever a request to perform that transaction is sent to the data manager. Thus 
synchronization mechanisms that do not allow such transactions m be performed promptly 
should be avoided. The goal of partial operability will thus serve as a guide in selecting 
synchronization techniques for transactions. 

One synchronization technique that has already been introduced is the sequencing of 
messages sent between processes. Sequencing consists of guaranteeing that messages sent 
from one process to a port are received at that port in the same order in which they were 
produced by the process. As we shall see in the next chapter, robust and sequenced message 
communication is sufficient to provide proper synchronization of many kinds of transactions. 

Sequencing atone does not compromise the goal of partial operability. The only case 
in which the constraint of sequencing prevents a message sent from a process p to a port q 
from being promptly received and acted upon is the case in that there is a previous message 
from p to q that has not yet been received at q. Using the implementation of robust 
sequenced processes described earlier in this chapter, this situation is quickly remedied 



-51- 
whenever it occurs. Unfortunately, as we wrttdenwmtratein Chapter 4, sequencing alone is 
not sufficient to perform all transactions atomically. 

A second technique that could be used to control the order of execution of 
transactions is one that I call expfteh locking , gxptidt locking consists of postponing the 
reception of some class of message by a process until some other message his been received. 
Chapter 4 will discuss locking in greater dettil and wM irmoduce a mechanism for exptieit 
locking into the process model. 

A synchronization scheme using explicit locking does not achieve the goal of partial 
operability. Using explicit locking, a data manager could postpone the reception of a request 
to perform some focal transaction until that data manager had received other messages. 
Explicit locking could cause the local tran$acttoo tobe debyed indeftnHery. 

Sequencing and explicit locking both control the order of processing operations by 
controlling the order in which messages are received by processes. Another approach to the 
control of the order of execution of processing operation J is to control what action is taken by 
a process on receiving a message. The following two syHchrorriiation techniques use this 
approach. 

One way in which a process can postpone the processing operation requested by a 
message that that process receives is to record the message in the focal state of the process. 
The stored message can be retrieved and acted on in a later process process step. One could 
call this technique squirreling. 

Using squirreling, a transaction focal ft one data manager can W delayed indefinitely 
because the request to perform that transaction can be squirreled away indefmttery by that 



-82- 
data manager, pending rwaption of mm other message. Thus so^rrehfig dc«s no* achieve 
the goal of partial operabsiity. 

Another mechanism that an be used to postpone the processing requested by a 
message is to have a process that receive* a menage fcfcat the process should not yet act on 
send the message te another process. That other process would ekhw v * on the message, or 
past it on agairw pouibly back to the flm proceu. ThU teche4qp« oouW be refered to is 
buck passing. Buck passing also dees not achieve the goal of partial operabiltty, at a 
request to perform a transaction could be deferred indefinite!* by being passed from process 
to process. 

Both buck passing and squirreling are what , could ^ba. calked ^mp l fcit bjgUgg. 
(because request messages are not explicitly post pon ed , but the requested processing is 
postponed). Implicit locking is characterized by the fact that two or more process steps of the 
data manager receiving a request are used to perform the practising requested by a message. 

When two or more process steps of a Jingle daft manager are used to carry out a 
transaction, the goal of partial operabJJitjr is not achieved, U ...two or#orf process steps carry 
out accesses for a transaction, then other transactions that access the items accessed jm those 
steps may have to be excluded from occurring between Hie two steps. If a failure delays the 
second step of a data manager performing accesses for a tninsactton, then transactions local 
to that data manager that must be excluded may be indefinitely delayed. If only one process 
step (of two or more) of a dam manager performs accesses for &te transaction, then some 
condition must be preventing those accesses from being pei famed by the first step of the 
data manager. The .manager must in effect be waiting for some message before it will 
perform the accesses for ^transaction. That message could be defoyed indefinUeJy, 
delaying the transaction. 



-53- 
To summarize, the sequencing mechanism is the only one of the techniques for 
controlling concurrency in the process model that IfehieVes ' thi f £t»f of partiaf operability. In 
Chapter 3, we will demonstrate a mechanism that uses sequencing to provid* control for 
many processing operations. In Chapter 4 I demonstrate that sequencing atone is insufficient 
for coordination of all possible transaction, and show that some mechanism in which two 
process steps of some process are used to perform one transaction is needed. 

■ urji-- &■■• x- itt-- ■ >< • ■: 

2.3 Summary 

This chapter presents a semantic model for a distributed information system in which 
the effects of failures are well specified. The model combines features of Actors, Data Flow, 
and the Object Model. The model makes a strong distinction between two forms of 
communication: inter-process messages, and intra-proceu communication through shared 
state information. 

Two different classes of failures in a distributed information system were discussed: 
site failures and communication failures. We showed two ways in which the process model 
could be extended in order to include a specification of how computation is affected by such 
failures. One extension (simple processes) was easy to implement, but allowed failures to 
have relatively severe effects. A second extension (robust sequenced processes) limits the 
visible effects of failures, but requires more overhead in its implementation. The remainder 
of this thesis will make use of robust sequenced processes in developing algorithms for 
performing transactions. 

The problem of performing transactions atomicaHy is translated into the terminology 
of this model, and a plan for an implementation of a distributed information system based 



-54- 

on the model Is given. A condition for determiroRg whether or net a transaction is atomic is 
expressed in termt of the ordering relationship* of the model. 

Finally, techniques for controlling the order of execution of process steps were 
discussed. One of these technio^ (sequencing) was shown to be consistent with our goal of 
partial operabttttT. Other techniques shew a failure to dotty fee completion of a local 
processing operation indefinitely, but as we shaft dem onstr ate in Chapter 4 are necessary for 
coordination of some kinds of transactions. 



-56- 



from the execution test/as? of /k For a process that receives message* at one port only, the 
execution history can be represented as * lot of messages received, as shewn in Figure 3,1 
This representation can be viewed as a (eg, recording each message received by > as tt it 
received. The most recently received message in the execution history Is at the top of the test 

I define a broadcast nwssaf e to be a set of messages and destination ports. A 
broadcast message B can be represented by a set of pairs, {Im^l ~, [m^n^ such that m i is 
a message, and p t is the name of the port (and pfocess) to which mj is sent The individual 
messages that make up a broadcast are referred to as the immflU of the broadcast The 
order in which a group of receiving processes receive a group of broadcast messages can be 
derived from the order in which the components of those broadcasts are received by the 
individual processes. The order of two broadcast messages Bj and B^ is defined as B| < Bg 
if B| contains a message m| sent to a process p, and Bj contains a message m2 »*» ««Bl to #, 
and rnj <p mg. This defmaion is completely anatagoo* to the definition of the < ordering 



w? i wwj i nwmmi^ 



mm* mmnmimmiimm0ifm*!'£ 



Figure 34. - 
Abbreviated Bxeoutioa Htetory ef m 



™s 



0*2 



:"»!,■ 




m l *p ^2 K p m 3 



- 55 - 

Chapter 3 
Atomio Broadcasting 



Many transactions performed by a distributed information system can be decomposed 
into independent component operations, each of which is performed at one site and does not 
depend on any other site. In the model of the previous chapter, each component of such a 
transaction is performed by a single process step. All of the messages that form the inputs to 
these process steps can be constructed in advance, before any step is performed. The 
ordering of such a transaction relative to other transactions is controlled by the order in 

which these messages are received. 

'.'■■'•'■ :- •• . , ■ .*?*-..■..-■■,:■'■■. .'- 

In this chapter, I introduce a mechanism for atowic bfoUwurtay . which distributes 
a set of messages to a set of destination ports so that tliey are received atbmically with respect 
to other such sets. If an atomic broadcast is used to distribute the input messages for a 
transaction with independent component*, that JgMMM&tft u p erf orme d atomteallv. Atomic 
broadcasting is a simpler problem than that of coordinating arbitrary transactions. 

S.1 Definitions 

For convenience, I assume that all messages and all ports are uniquely identified. 
Many processes receive messages at a single port only. For such processes, I will use one 
identifier such as p to refer both to a process and the port at which that process receives 
messages. Recall that for each process p there is an ordering <p on the messages sent to p 
that reflects the order in which those messages are received. Each message m is included in 
the order <p when it is received by p. The ordering < p for a process can be determined 



-57- 
on transactions. Similarly, a broadcast message M is atomic with respect to some set of 
broadcast messages if the < ordering on those messages is cycle-free. ... 

Figure 3.2 illustrates the reception of three broadcast messages that were not atomic 
B| includes two component messages mj X for X and m| y" for K. Similarly, B 2 and B3 
contain components for X arid Z, and for Y and Z respectively. In this example, X receives' a 
component of Bj before one from Bn, Y receives a component of B3 before one from B^ and 
Z receives a component of B 2 before B 3 . These Wdering relationships constitute a cycle. 

A second way in which two broadcast messages could be considered to be ordered is 
if the sender of one message was one of the receivers of the other. For the moment, 1 will 
ignore this kind of ordering relationship. A Ut^ section eioends the notio^ of an atomic 
broadcast described here to include such reUttonihipi. 

Figure 3.2 
Non- Atomic Brosutaastlng 



Ba ■ {OHa y tX>,<nt27*Z>} 
Bj - {<mj]Y.V>.«fti s ^>} 



"•fcX 



m 



IX 






B 2 > Bj 



Bj>B, 



B,>B 2 



-58- 
I.J Ail Illustration of Atomic Broadcasting 

The independence of the process steps to be coordinated in an atomic broadcast (the 
steps that receive the messages that make up the broadcast message) makes coordination of 
atomic broadcasts simpler than coordination of more general operations. A simple real world 
analogy may help to illustrate this point Consider an office in which all communication is 
through interoffice memos. Sending some important notice to alt employees about a change 
in working procedures, is an instance of an atomic broadcast, The notice should be sent 
atomically, so that employees working on and communicating about die same project receive 
the notice at the same point in their work. This ttti "'ise accomplished relatively easily 
through the office mail system. At one instant, atl of the notkes are entered into the mail 
system and tale their places hi the queues of mail wafcmg to be delivered to and read by the 
employees. After that each employee will find the notice at the same pottrt (retative to other 

mail) in his list of messages. It does not matter that some employee on vacation may not see 

•■J »'■ ..■■•'? 
the memo for a month or more, as he will eventually see it in the proper sequence relative to 

other mail. 

Compare this situation with that of a group project which requires a joint discussion 
by a group of employees. To complete such a project atomic ty sftf$» "wespecf- to other work 
in progress effectively requires that each group member set aside a certain time for the 
discussion. Scheduling the meeting is a much more diffkufc problem than placing a notice 
in each employees in basket A second, more serious problem is that if the meeting has to be 
suspended for some reason, the members of the group can not work on any other project that 
may conflict with the group effort as the effects of such work witlynot be known to other 
members of the group. 



-59- 
This analogy is crude, bat gives a feetmg of the differences involved. The 
distribution of the memo as an atomic act ts easy, because there -are nd comtralnts on when 
the recipients actually read the memo. It it sufficient to place the merao in the correct 
sequence in each employee's mail. 

S.S A Mechanism for Atomic Broadcasting 

In this section, I present a mechanism for coordinating atomic broadcasting that uses 
robust sequenced communication between processes to distribute the component messages of a 
broadcast message to their destination ports. Tie solution uses 'processes thai £ref# to as 
message forwarders to distribute these messages. Each nwssag¥foMa>der receives messages 
at a single input port A message forwarder has a single process step specification which can 
be described by a function /fM) - {tm,/0}, mapping each message' reosivect to a set of output 
messages and destination ports for those messages. ' ; ; 

The messages received or sent by a message forwarder each contain a set of 
component messages and destination ports. The components of each such message fprjrr^ a 
subset of the messages that comprise some atomic broadcast Each process step of a message 
forwarder receives some input message and partitions the components 6T that message among 
the output messages that it produces. For each such f step; the output messages together 
contain exactly the same set of components at the input message to that step. 

The protocpJfQjc atomic broadcasting organize^ *U *rf {he, processes in the system, 
(message forwarders, transaction processes, and data managers),. in a hierarchy . Each process 
p has a unique parent/ >n the hierarchy. I will also describe this relationship by saying that 
p is a child of/. I say that p and q are relatives if either p is the parent of q or q is the 
parent of p. In the hierarchy used for this protocol, each process/ that is the parent of some 



-60- 
other process is also a message forwarder, and there is a single message forwarder r which is 
the root of the hierarchy, and i* an ancestor of aft other processes. The transaction processes 
and data managers form the leaves of this hierarchy. Any hierarchy of message forwarders 
can be used to perform atomic broadcasting. As we shall «ee, however, the organization of 
the hierarchy determines the number of messages that must be sent to distribute each 
broadcast, and should be made with some knowledge of the expected communication 
patterns. 

In order to send an atomic broadcast, a process formulates a single message 
containing a set of components, each of which specifies a message to be sent and a 
destination port. This single message is sent to any message forwarder that is above all of 
the destinations in the hierarchy. Recall that each step of a message forwarder partitions the 
components of the message received among the output messages produced.. £ach message 
forwarder sends output messages only to its children in the hierarchy. On receiving a 
message, a message forwarder partitions the components of that message such that each 
component is sent to the child that is above the destination port of that component in the 
hierarchy. 

Figure 3.3 illustrates the operation of this protocol in distributing the three broadcast 
messages shown in Figure 3,2. The processes are organized in a three-level hierarchy, where 
/is the parent of processes Y and Z, and r, the root, is the parent of f and X. Figure 33 a 
shows the orderings for all processes after Bj and B2 have been received by r and B 3 has 
been received by/. Figure 3.3 b shows an intermediate jute in the distribution of messages 
to X, Y, and 1. Figure 3.3 c shows the final state when aft cornpohems of all three broadcasts 
have been received. 



-61- 
Flguro 3.3 
Coordinating Atomic Broadoasts with Message Forwarders 
Bi'{[m lx ,Xl[m lY ,Y)} 3.3 a 

B S " « m s,Y« K Mi»s^]} The Initial Execution State 




-62- 

Figure 3.3b 

An Intermediate State 




-63- 

Figure3.3o 

The Execution State After Delivery ef Bj, Bg, and B3 




-64- 
A brief argument for the correctness of the sriutton Is given here. A more detailed 
and more formal proof appear* in an appendix to thU thesis. Any two broadcast messages 
Bj and B2 are initially se que n c ed by one message message forwarder, the highest message 
forwarder receiving messages connected with both broadcasts. Because the message stream 
between any two processes is sequenced, the order of two broadcasts sequenced by a message 
forwarder is preserved as the messages connected with those broadcasts travel down the 
hierarchy toward their destinations. This sequencing insures that no pair of broadcasts in 
the < ordering can form a cycle, (i*. there art no messages for which Bj < &g and B2 < B|X 

The proof that no larger cycles can arise is substantially more complicated. The 
proof of the message forwarder protocol given in the appendix covers cycles of all sizes. 
This proof uses the properties of the hierarchy to show that no cycles can be achieved 
without a violation o£ sequencing between a process and its parent in thV hierarchy. 

There are several desirable properties of this protocol that are not obvious. One is 
that each process executes a single process step for each broadcast This mechanism does not 
use locking as defined in Chapter 2. The solution insures that all processes, including the 
message forwarders, receive the messages sent in distributing a broadcast rnestage atomically. 
THe transaction synchronization mechanism described in the next chapter makes use of this 
property. 

Another point to note is that the protocol works for structures of message forwarders 
other than hierarchies. I wiH use the term synchronization network for the logical 
organization of processes used in the protocol. A synchronization network is simply a 
directed graph that describes the parent - child relationships among processes. The proof of 
the message forwarder protocol given in the appendix depends on the synchronization 



-65- 
network only in that it requires that there be at most one path in the synchronization 
network between any two processes. This property is,.of course, satisfied by a hierarchy. 

■ ■'■.'.'■.. ...'■ :> ■ ' : 1 ■■-''.-": '..'■■ - - j?a . \ '. 

- " ■ ■ ■ ■:■* - ■ "•■*« '" ■■■:• ■ ' ■■ 

A second requirement that must be imposed on the synchronisation network used in 
the distribution of a broadcast B is that the destinations of the components of B must have a 
common ancestor in the synchronization network. If this were not the^case, there would be 
no way to distribute B using the protocol, because there-is 1 ntf process to which B can be sent 
initially. If we are designing a synchronization network capable of coordinating any 
broadcast message involving a group of processes, then we must insure that all of those 
processes have a single common ancestor. This requirement, taken together with the 
requirement that there be at most one path between processes in the network, means that 
such a synchronization network must be a hierarchy. If, however, the set of broadcast 
messages (or at least their destinations) is known, and a synchronization network is being 
designed specifically to distribute those messages, then it h possible that a n on-h i er ar chical 
network could be used. This is illustrated by the example in Chapter 6. 

14 Other Ordering Restrictions on Broadcast Messages 

The above protocol insures that each process receiving a component of a broadcast 
receives that component in the, same order relative to those of other broadcasts. Thus a 
broadcast is atomic as viewed by the receivers. Recall, 4 howevejc, that there Is another , way in 
which the processes may perceive ordering among broadcasts, £i that the. sender of one 
broadcast may have been a recipient of c<her broadcasts. 

In general, each process step that produces a broadcast may have received some 
knowledge about other broadcasts. This potential knowledge can be described by the should 
follow relationship among messages described below. Each message m sent by a process p in 



-66- 
a process step s sj&ujg tfiUfi* * message m' i-eceived by £ whenever: 

a) There is a message m" received by p in process step s or in a step 
that preceded s, and m' and m" are compo nen ts of the same 

OR 

b) There is a message m n received by p in step s or in a step that 
preceded s, and m" should follow m'. 

This relationship describes ordering constraints among messages that must be enforced in 
order to prevent the system from behaving anomalously if the correct interpretation of a 
message m sent by a process p to a process q depends on f having received messages 
containing information that was derived from broadcasts received by p before p sent m. 

For example, if I could in one atomic broadcast send my paycheck to be deposited at 
the bank and checks drawn on my account to pay monthly bMb, it would be disturbing tome 
if when one of those check* was lent to my bank to be cashed, it arrived before the deposit. 
This kind of behavior does not violate the definition of an atomic broadcast given above. 
In this example, there are two separate actions; my distribution of the deposit and payments, 
and my creditor's sending of the check to the bank to be cashed. Each of these could be sent 
in a separate atomic broadcast, however they cannot be part of the same atomic broadcast, as 
the debtor's action is not known until the check is received. Nothing in the definition of 
atomic broadcasting prevents these two broadcast messages from being sequenced in the 
apparently anomalous order, because the causal relationship b e twe e n the two events that 
produced these broadcast messages is not recognised. 



-67- 
Unfortunately, the protocol described above allows such anomalous sequencing to 
occur. Consider the hierarchy shown in Figure 3 J . A menage m sent to both X and Y 
must be initially sent to the message forwarder r. It is possible for X to receive its 
component, and construct and send a message to Y, and haye this new message received by 
Y, before the component of m sent to Y is received at Y. 

A simple extension of the message forwarding system described above provides 
correct sequencing. Each broadcast B must initially be sent to a message forwarder/ in the 
hierarchy that is an ancestor of the sender ofB as ^#«n-~^'*iKiP^nf "ti?'t^SB^ |stfllbflUi F kilMictefiMl 
with the destination ports of B. 

Notice that if a component of some broadcast B has been received at any port, then 
any component of B that is destined for a process p and has not yet been received by p must 
be awaiting reception at the input port to some process that is an ancestor of p. The 
extended protocol prevents anomalous sequencing by insuring that a message B enters the 
hierarchy above all of the messages that B should follow. The sequencing of messages 
between the message forwarders then insures that any message that B should follow will be 
received at its ultimate destination before B. A more detailed proof appears in the appendix. 

This solution to anomalous sequencing is very simple, (though the proof that this 
solution works is somewhat complicated), and easily i mp leme n ted. Therefore, I will only 
consider the implementation of the more complete solution. 



-68- 
%$ Implementation 

In this section, 1 wffl present two simple implementation* of the synchronization 
protocol described above, one using point-to-point communication, and one taking 
advantage of communication technology that makes distribution of one message to several 
receivers relatively inexpensive tnere are many optirolzatiom that could be used to 
improve these incrementations. I present them merHy to show that such a system could 
easily be implemented, and that I have not Ignored any d|$cu|t problem* by making 
unreasonable assumptions about the implementation of message Jpvardcrs. 

S.5.1 Atomic Broadcasting Using Point-to-Point Communicatioa 

In chapter two I presented a simple implementation of robust sequenced 
communication. This Implementation can be extended to implement message forwarders. 
Robust, sequenced communication insures that messages sent from a message forwarder to 
some port arrive im the sequence to which they were produced, and are not lost In addition 
to proper sequencing, we must show how the hierarchy of message forwarders can be 
maintained. 

A serious problem of the protocol described above for message forwarders is that 
each process that sends a broadcast message must know the location, in the hierarchy, of 
each of the destination* of the components of that broadcast This knowledge is necessary to 
select a message forwarder that is above aS of the destination*. A second problem is that 
each process may send messages to a large number of ports. This is expensive using the 



- 69 - 
implementation of processes described in Chapter % because message queues must be 
maintained for each such port. 

I solve these problems by changing the protocol for distributing the components of a 
broadcast slightly so that each process need only communicate with its parent and children 
in the hierarchy. This is accomplished by changing the process step specification of a 
message forwarder so that if all of the components of a message received by a message 
forwarder are bound for descendants of that message forwarder, the message is partitioned 
among the children of the message forwarder as be|o|e. If, however, the destination of some 
component is not a descendant of the message. forwarder, the message 1$ sejit, intact, to the 
parent of the message forwarder. 

To send a broadcast using this modified protocol, a process formulates a message 
containing a list of the component messages of the broadcast, and sends that message to its 
parent in the hierarchy. This message rises in the hierarchy until it is above all, of the 
destination ports of the components of the broadcast (as well as its sender). When the 
message reaches a message forwarder that is above all of the destinations, i\ is distributed as 
before. Each process communicates directly only with its immediate neighbors in the 
hierarchy, thus the number of message queues needed to maintain robust sequenced 
communication is small. 

We can now consider how the necessary information about the hierarchy could be 
maintained. Each message forwarder / must know which processes lie below each of its 
children in the hierarchy. This knowledge couU be bulk into each message forwarder, or be 
built into the structure of process names. If the life of the hierarchy exceeds the usefulness of 



-70- 
individual processes, however, we must expect that processes will be created, deleted, or even 
moved in the hierarchy, and that these changes must be reflected to the message forwarders. 

In showing how to add a process, I will assume a mechanism for generating unique 
process names, and wiH not attempt to solve the problem that a process being added to the 
hierarchy may already be part of it I also assume that it is possible to determine from a 
port name which process receives messages at that pott. This knowledge allows the message 
forwarders to determine the destination prxtis of a message from its destination port. 

To add a process p to the hierarchy, some message forwarder/ is selected to be the 
parent of p. Process p informs / of this choice by sending a *reejwttt for adoption" message. 
This message establishes the message queues and sequence numbers for sending messages 
from p to/. Message forwarder / can reply to p either by accepting or rejecting this request 

If the request is accepted, the mechanism for tending messages from /to p is 
established with the sending of the reply, and p can begin to send and receive atomic 
broadcast messages. Message forwarder /sends a message to to parent which is propagated 
up the hierarchy informing all processs that are now ancestors of p of the presence of p. 
Before any message can be sent to p, the sender must be informed of the existence of p. Any 
message that could inform a process of the presence of'p must either have been sent by p or 
should follow (as defined in the previous section) some message sent by p. The messages 
that inform the message forwarders of the presence of p wJH always precede any message sent 
by p (and therefore any message that should follow a message sent by p) at the ancestors of 
p, because of the sequencing. Thus any message forwarder encountering a message with a 
component sent to p is guaranteed to know whether or not p if one of its descendants. 



-71- 
Spedal care must be taken when the request for adoption is rejected. 
Communication failures can cause either the request for adoption or the reply tti that request 
to be lost. We must be sure that loss of messages cannot cause p and / to become confused 
such that one thinks that the request was successful while the other does not Such confusion 
is particularly likely if the request is re-transmitted by p iff does not respond prpnptly. 
This problem is similar to that of initiating a connection in a communication protocol, such 
as TCP [Cerf74] or DSP [Reed76l The solution that I am using is similar to that of DSP. 

When a message forwarder rejects a request for adoption, it may be doing so because 
it has insufficient resources to establish communication with a new child. If this is the case, 
we do not wish to burden the message forwarder with the task of remembering that it has 
rejected a request. Therefore, we must keep in mind that if a message forwarder/ is sent a 
request for adoption several times (because the sender of the request re-transmitted the 
request when /did not reply promptly with an acceptance), then /may first reject the request 
and subsequently accept it. (Once a request has been accepted, however, the message 
forwarder can know to accept any subsequent re-transmt*stons of that request). This means 
that a process that has sent a request may not negotiate with another potential parent if it 
receives a rejection or no prompt reply, If thf original request (or a re-transmission of the 
original request) were later accepted, this could allow one process to have two parents in the 
hierarchy. Thus we require that if a process receives a rejection (or no reply at all), it must 
either keep trying (retransmitting its request) until it is accepted, or choose a new unique 
name and attempt to establish communication with another parent. 

This approach may result in a message forwarder adopting a process that no longer 
exists (because that process has chosen a different name), but this does not cause a problem. 
No messages will ever be sent to or from such an abandoned process. An abandoned process 



-72- 
will be detected and deleted from Hie hierarchy through the sum mechanism that deletes 
processes that U described to the fotfowtog paragraph- 

A process can remove itself from the hierarchy by sending a message to its parent 
notifying its parent Of its intention to leave the hierarchy. When a message forwarder 
receives such a request, or a message forwarder can reliably determine that one of its child 
processes no longer exists then that message forwarder can reclaim the message queues for 
that process and inform the ancestors of the message forwarder of the disappearance of the 
process. Once a process has left the hierarchy, it must choose a .new. .name to onjt? to re-join 
unless it can be determined that no process rememben die old name. 

A process p can be moved from one location to the hierarchy to another location in 
the hierarchy to a series of small steps of the form shown in Figure 3.4. Each such step 
changes the parent of p from/ to g, where/ is the parent of g, or g is the parent of/. Both 
cases are essentially shnilar, so I wi8 only describe the former. 

1) p send* a message to / (the current parent of £) requesting 
movement, and slops sending messages to/. 

2)/ receives this request, and performs an atomic modification of its 
state with die fcNawtog changes: A request to dose is put at the end 
of the output message queue for p, pb request is put ^ the output 

inessage 'ejoeuel^ g; and^' 
reflect ^Y movement 

3) p receives die request for dose from/drops its now eimrty queue 
of messages for/ p now sends a request for adoption to g. 

4) g receives the request from / establishes a queue for p, and 
accepts p'% request for adoption, g now updates tat View of the 
hierarchy to include p " : *"''~ ?1 '^''" "' ' :; * Vi 

The hut two steps take place in either order, depending on the relative timing of the 



From: 





-73- 

Figure 3.4 

Moving a Process 



To: 




Or 



From: 




To: 





-74- 
messages sent from/to p and g. No JujpwMfe. .fCjthe mow must be propagated beyond/ 
and g. Note ah© that the transfer should be negotiated in advance to prevent £ from 
refusing the request for adoption. 

S.5.2 Atomic Broadcasting with a Broadcast Medium 

In many communication architectures, it is no more costly to send a message to a set 
of receivers than to a singte destination. A broadcast Mtwjfk, «ch as a ring network 
DFarber72] or an Ethernet [Metca1fe76] has this property, as does communication through 
shared memory on a single site. Our scheme for atomic broadcasting can be modified to 
take advantage of this ability to distribute component messages of a broadcast to several 
receivers. 

In the absence of errors, a broadcast network acts like a message forwarder. Each 
site presents its messages to the network. The network receives one message at a time, and 
distributes that message to the intended receivers. Messages sent through the network are 
totally ordered, just as messages sent through a forwarder. 

To send an atomic broadcast message to a set of receivers or. the same network, all of 
the component messages of that broadcast are packaged into a' single message tor the 
network. If the packet size of the network is too small to hold all of this, the contents of each 
component message can be pre-distributed to its intended receiver. The sender picks a 
unique identifier for the broadcast and attaches it to each component message, sending the 
component messages singly or in groups to the intended receivers. When such a component 
message is received, it is saved by the receiver and not placed in the stream of incoming 
messages. When all components have been distributed, the sender sends a message 
containing the unique identifier to all receivers using the broadcast capability of the 



-75- 
communication network. The unique identifier is used by the receivers to identify the 
component message that was pre-distributed and insert that message into its stream of 
incoming messages. The broadcast networks designed thus far all have a packet site large 
enough to accomodate such a unique identifier. 

If very large messages are sent, it would seem that we are not obtaining any benefit 
from the availability of the broadcast mechanism over the point-to-point scheme described 
above. Note, however, that if we were to use the point-to-point scheme for coordinating 
atomic broadcasts among the processes executing on sites connected by a broadcast network, 
then in general each component of a broadcast message would have to be transmitted over 
the network twice, once to reach the common ancestor of the recipients, and once to reach its 
destination. The protocols of this section transmit each component of a broadcast message 
exactly once,' thus saving extra message transmissions. 

If the network and sites were completely reliable, including all components of an 
atomic broadcast in a single message would be sufficient to distribute the component 
messages atomically. Unfortunately site failures or simple lack of buffering can cause a site 
to miss a message from the network. To solve this problem, there must be a mechanism that 
uniquely orders the broadcast messages, even if failures occur during the transmission of a 
broadcast. Such a mechanism would allow each site to know the order in which incoming 
broadcast messages should be processed by that site, even if failures cause some of the 
transmissions to the site to be lost or to arrive out of sequence A mechanism must also be 
provided to allow a site that has missed a message to obtain a copy of that message. 



I. This excludes re-transmissions necessitated by errors. 



-76- 
This can be accomplished by appointing one site a» the coordinator of the 
broadcasting. The coordinator has the responsibility for arbitrating the broadcast messages 
on the network, and does so by assigning a sequence number to each. To send an atomic 
broadcast, a site assigns a unique identifier to that broadcast and transmits the components 
of that broadcast in one or more transmissions on the broadcast network. Each transmission 
is identified with the unique identifier, so that the receivers can identify the transmissions 
that are used to distribute an atomic broadcast. 

These transmissions are seen by every node on the broadcast network, including both 
the recipients and the coordinator site. Each recipient receives and stores its component of 
the broadcast from the transmissions used to distribute that component This stored 
component is not yet included in the input message queue of the receiving process at that 
site. The coordinator receives and stores all of the components. When the coordinator has 
received all of the components of an atomic broadcast, the coordinator assigns a sequence 
number to that broadcast and transmits a message to all sites containing the sequence 
number, the name of the sending site, and the sending site's unique identifier for the 
broadcast. This message informs all receivers of co mpon e nts of that broadcast of the proper 
sequence in which that broadcast is to be included with components from other broadcast*. 
The message from the coordinator also serves as an acknowledgement to the sender of the 
broadcast that the broadcast has been distributed and *e sender tan delete it from its output 
message queues. 

It is relatively simple to see that this scheme works tf no errors occur, as it is 
essentially the same as the scheme for distributing large broadcasts in the absence of errors 
described above, with the exception that the coordinator distributes the single message that 
demands all receivers to include the broadcast in their input message queues, rather than 



-77- 
having the sender of the broadcast do this. To see how this scheme also works in the event 
that messages are lost on the broadcast network, Jet us contider the po«lbte errors. 

One error that can occur is that one of the transmissions used to distribute the 
components of the broadcast is not received by one or more sites. If it is the coordinator that 
misses one of these transmissions, then the coordinator wffl nevir detect the broadcast as 
feeing complete, and wiH not send the sequence number message. After a suitable timeout 
interval, the sender of the broadcast can detect that something & amiss (because it does not 
receive the message from the coordinator) and can retransmit the components. Any site that 
received the component* correctly the first time can identify and discard the retransmission 
because of the unique identifier assigned by the sending site. 

If one of the receivers faHs to receive a component correctly, but no other errors 
occur, then eventually the coordinator wiH transmit the sequence number for the broadcast 
The receiver will discover that it has not stored the component tor the broadcast identified 
in the message sent by the coordinator, titsftah request retransmission Of that co m p on e nt by 
the coordinator. Thus the coordinator also acts as a backup for obtaining copies of lost 
messages. 

Another error that can occur is that the the message sent by the coordinator may be 
missed by one or more sites. If the sender of the broadcast does not see this message, it will 
begin a needless retransmission, which again can be discovered and discarded by the 
receivers. The coordinator can retransmit its message to acknowledge the distribution of the 
broadcast to the sending site. 



-78- 
If one of the receivers misses the cowdthator's message, **»,«»» not be immediately 
detected. The receiver wtt detect that it is out of date when it next receives a message from 
the coordinator. That receiver can then request retransmission of the messages that it has 
missed from the coordinator. 

The protocol described above for atomic broadcasting using; a broadcast 

. communication network It relatively simple, makes eJftcMiajJhof «t* network *f Jk> erfors 

occur, and works correctly if messages are test or duplicated by th* network. There are 

several points about thi* protocol that imist be clarified before it can be used « Use basis ft* 

a practical implementation of atomic broadcasting. 

The coordinator site must record afl of the broadcast messages, and must keep each 
broadcast until it knows that that broadcast ha* been received by all receivers, to order to 
avoid having to save broadcasts forever, we can have each site pertodically send a message 
containing the sequence number of the most .recent broadcast 4^ ,^t i; s|tt! ? ha^. received 
correctly. The coordinator can use these messages to determine when ^jstjfejo delete a 
saved broadcast message, and when a site is out of date and should be sent information 
about one of the saved broadcast. The message sent by the coordinator must identify which 
of the sites are receivers of the broadcast. This information can be determined from 
examining the components of the broadcast, ana can be encoded- in- fne message by using a 
bit vector with one bit for each site indicating whether or no* maf site" is a receiver of the 
broadcast. The bit vector is used by a receiving stte in order to determine whether or net 
that site should have received a component of the broadcast this in turn tefts the site 
whether or not it missed the transmission of the co m po nen t by the senaer. 



"^v*' ;■' .■&■"■'■*;*■*&':■■ 



-79- 
Each site must keep track of the most recent sequence number sent by the coordinator 
that has been seen and correctly procewed by the site, m a typicml application of this 
protocol, it might be the case that each site i* a receiver m relatively few of the atomic 
broadcasts,. If this,u the case, it may be rwcemry to filter the< messages sent by the sender 
and by the coordinator in the receiver*! network interface in order to avoid interrupting the 
receiver unnecessarily. This could be done by maintaining a register in a site's network 
interface, which contain* the sequence number of the most recent broadcast that that site has 
correctly processed. When a message from the coordinator is seen by the network interface, it 
examines the message to determine whether or not the sequence number in that message is 
one greater than that in the register in' the network interface. If this is so, and if the message 
does not describe a broadcast in which the site is a receiver, then the register is incremented, 
and the receiving site is not interrupted. If a message from the coordinator does not meet 
these conditions, then it is reported to the receiving site, which either detects that the 
sequence number indicates that the receiving site is out of date, or that the message pertains 
to the receiving site. If the message pertains to the receiving site, then the receiving site 
incorporates the broadcast described by the message into its input message queue (in stable 
storage), and then updates the sequence number in its network interface 1 Otherwise, the 
receiver requests retransmission of the missed messaged) from the coordinator. 



1. Notice that if a second message comes in before a message received by a site has been 
incorporated, the whence number in the network interface of the site may be out of date. 
This causes no problem, as the site detects that it has misted the second message and 
immediately obtain* it. The sequence number cannot b« opdised before the message has 
been recorded in the input message queue, as a failure of the she may cause the message that 
has been received but not yet recorded in the queue to be lost 



-80- 
S.S.S Use of Broadcast Networks and Poim-tc-Point Communication Together 

The schemes for providing synchronization of atomic broadcast! u»mg a broadcast 
network can be used in c onj u nct io n with the message protocols for point to point 
communication in a network with a number of different physical communication media. To 
do so most efficiently, aH of the processes running at sites tmked by a broadcast network 
should be made children of a sing te meuage rerwai^ representing Che rwtwork. Other 
broadcast networks and sites are Unktd through meiisge fbiwarderi representtrig gateways 
connecting networks. 

To see how this is done, consider the physical communication topology shown in 
Figure 3.5. The physical configuration is three broadcast subnetworks, with sites F and C 
acting as gateways between Nttl and Net2, and between N$t2 and N*t3 respectively. One 
possible efficient hierarchy for this network is shown in Figure %&, This figure is a skeleton 
hierarchy showing one message forwarder for each site. The processes at a site would be 
descendants of the single message forwarder shown for that site. Consider a broadcast 
message sent by a process at site g to processes at sites D, E, H, and /. Site G would use the 
broadcast network Ntt2 to distribute components to sites D, E, and F This message 
forwarders at D and E would route their components to the proper destination processes. 
The message forwarder at F would use Net I to distribute the messages for H and /. 

3.6 Evaluation 

The algorithm described here for coordination of an atomic broadcast Js only one of 
many that could have been used for this purpose. The desirabili^r of this algorithm as 
opposed to the others depends mainly en the extent te which the hierarchy of message 



81 



Figure 3.5 
A Physical Communication Topology 













© 



<3 — © 



- 82 - 

Figure 8.6 

A Logic*! Topology for tho Network of Plguro 3.6 



■ *\ i .«, . 




forwarders reflects the logical and physical communication paths in the distributed 
information system. 

I have already argued that many applications for a distributed information system 
exhibit a strongly hierarchical organization. This is a reflection of hierarchical management 
policies. If the hierarchy of message forwarders is chosen so that processes that need to 
communicate frequently are nearly always children of the same message forwarder, the 
message forwarder scheme involves little extra message passing beyond direct communication 
between processes. 



-83- 
This is particularly true if the physical communication network is also hierarchical. 
If the physical communication network is hierarchical, Ccounting broadcast networks as a 
single node in that hierarchy), then the atomic broadcasting mechanism described here is as 
reliable as any other communication mechanism. Each message follows the shortest path in 
the hierarchy between its source and destination. Two transmissions take place for each link 
in the hierarchy that a message traverses (one carrying the message and one carrying an 
acknowledgement). This is the minimum number of messages needed to deliver a message 
reliably, and the synchronization adds no extra messages. 

If, however, the physical communication network is strongly non-hierarchical, with 
many alternate paths between any two sites, imposing a logical fttefrarchy may cause 
communication between some sites to be very inefficient, where a direct link between those 
sites exists. This problem can be alleviated to some extent by sending the contents Of all 
large messages over the shortest possible route, and sending a message header through the 
hierarchy to designate when the pre-distributed message contents are to be included in the 
incoming message stream of the receiver, as was done for broadcast networks with small 
packet sizes. This technique reduces the communication overhead #ue to|he hierarchy, but 
does not reduce the vulnerability of the hierarchy Jo failures. If much communication is 
local, however, this vulnerability may not be a problem. 

The message forwarder scheme has several advantages over other synchronization 
mechanisms for distributed systems that could be used to coordinate atomic broadcasting. 
One advantage is its simplicity. The message forwarder protocols can all be described by 
simple statements. Each step is deterministic, and the only source of non-determinism is the 
order in which two messages sent to the same process are received. 



- 84 - 
The inability of two processes to determine rettabty whether or not some message sent 
between them was received does not cause a problem In the message forwarder scheme. 
Using the protocols described above, once a message hat been sent, the sender assumes that 
that message could have been received, and does net te*ea?ijr action i n co n siste n t with that 
assumption. Once a message has been sent, it u retransmitted imiefuiitety until It gats 
through. 

Another interesting feature of this solution is mat the sender of a broadcast need not 
participate in the completion of a broadcast Once the broadcast message has been delivered 
to a message fo rward er , it wiR eventually be delivered to alt refers* evest If the sender 
crashes. The sender of a broadcast cannot, however, know when that broadcast wilt be 
dettvered, as that depends on me availabiUty of me message f orwar ders and moetvteig ports, 
and on the order hi which messages are received by those ports. ,JIhe broadcasts from one 
sender are, however, dettvered to the same order in which thief were sent. 

A third distinctive feature is that the order in which a broadcast is received relative 
to other conflicting broadcasts is not d e termi n ed in one decision. 'The decision is distributed 
among the message forwarders through which the m e ssages rf one broadcast pass, each of 
which performs some arbitration. In a scheme using tfmesmmps to arbitrate between 
concurrent messages, once a timestamp has been assigned to a message its order relative to 
other messages has been fixed. Postponing mis decision by dhtribuung it among the 
message forwarders provides greater flexibility mat can be, importer^, h* some circu m s t a nces . 

Even after some of the component mesages of a broadcast have been received by 
their destination pom, other messages from the same broadcast may stiff be held by the 
message forwarders, and thus other non-cortflkting broadcasts ^ent later may be received 
earlier. This ftexibiitty is important if the communication network connecting ports 



-85- 
partitions, in that broadcasts local to one or the other of the partitions can continue to take 
place, even if there are messages from more "global" broadcasts that have not yet been 
delivered. The extended protocol and the itnpleroentation discuwed above guarantee that 
this flexibility does not allow messages to arrive out of ccder, in that any port p receiving a 
message B will have received any message that the sender of ,J% could have been aware of 
before receiving B. 

The message forwarder scheme takes advantage of "locality of reference" in 
communication more effectively than many other synchronization schemes that could be 
applied to atomic broadcasting. Some schemes, such as those using timestamps, require 
periodic communication among all of the sites. Such a scheme would be expensive for a 
distributed information system in which most operations involve only one or a few sites. 
Sending an atomic broadcast using message forwarders, in contrast, requires only the 
participation of the sender and receivers, and possibly a few additional sites holding message 
forwarders. 

One point that remains to be explored is to determine exactly what kinds of 
operations can be performed using an atomic broadcast This question will be answered in 
the next chapter. 

S.7 Summary 

This chapter has discussed one simple synchronization problem in a distributed 
information system: that of sending a set of messages to a set of destinations "atomically". A 
mechanism was developed to provide the proper synchronization by using message 
forwarders to distribute atomic broadcasts to their receivers. The mechanism was extended 



-86- 
to prevent anomalous behaviour if correct interpretation of one menage depends on prior 
reception of some message. 

Two implementation strategies for message forwarders were presen t ed . One 
implementation that was independent of the physkat conirmHrtcatton network, using robust 
sequenced processes was developed. The protocols allow processes to be added, deleted, or 
moved within the hierarchy of message forwarders. A more efficient impl e menta tion that 
takes advantage of a broadcast communication network was also outlined. 



-87- 

Chapter 4 
Atomic Transactions in the Proeeats Model 



In this chapter, the problem of performing transactions atomically in a distributed 
information system described by the process model of Chapter 2 is considered in greater 
detail. A method is presented for describing the data fiow that a transaction causes among 
the items that it accesses. The difficulty of coordinating transactions to be performed 
atomically is shown to be dependent on the interaction of their data-flow descriptions. 

A synchronization scheme consistent with the goals set forth in the first chapter is 
developed. This scheme makes use of the hierarchical mechanism for atomic broadcasting 
described in Chapter 3. The mechanism is simple, efficient, and frequently avoids locking. 

4.1 Analysis of Transactions 

The techniques needed for synchronizing a set of concurrent transactions are 
•dependent on the data flow among data items caused by performing ihe transactions. The 
set of input items to each transaction and the way in which those inputs are reflected in the 
updates made by that transaction affect the way in which transactions interact I will use an 
abstraction which I refer to as a transaction graph to describe the data flow between items 
caused by performing a particular transaction. 

A transaction graph is a directed graph in which the nodes are the data items in the 
data base. These arcs show how the output items of a transaction are derived from the 
input items to that transaction. The transaction graph for a particular transaction T 



-88- 
contains directed arcs pointing at each item that is updated by T. For each such item i, 
there is an arc running from each item j such that the new yahie given to i by T depends on 
the value of j seen by T. 

Figure 4.1 shows the transaction graph for a simple banking transaction. This 
transaction modifies the values of three items, x, y, z. The transaction could represent a 
bank's action on cashing a 150 check for a customer, where x represents the amount of cash 
disbursed by the bank, y represents the account balance, and z represents the customer's 
"overdraft protection" loan account 1 

Figure 4.1 
A Slntplo Tranaaotioa Graph 

QS Q*> KID 



The Transaction T: 



Set x - X-50; 
If y < 50 then do, 

Set z - z ♦ y - 50; 

SetyoO; 

end; 
else Set y - y-50. 



1. In this simple example, we assume that the overdraft protection is unlimited and ignore 
any other bookkeeping that must be done by a "real" banking system 



-89- 
The transaction graph depicts the way in which the outputs of the transaction are 
computed. The arc from y to z in the transaction graph of T reflects the fact that the value 
for y must be obtained before the value produced by T for z can be determined. Such arcs 
describe constraints on any implementation of a transaction in that the access to an item that 
is the source of some arc must be performed before the access to the item that is the 
destination of that arc. 1 

In the process model of a distributed information system, a transaction is carried out 
as a set of process steps. A transaction graph can be used to construct a similar abstraction, 
which I refer to as an activity graph, describing the data flow among the process steps that 
implement a transaction. Two points cause an activity graph for a transaction to differ from 
its transaction graph: 

1) Several of the items accessed by a transaction may be held by a 
single data manager, allowing all of the accesses to those items to be 
performed in a single process step. 

2) Some data items may be replicated, with copies held by several 
sites. This means that one access In «ie transaction graph may be 
performed by several process steps in the activity graph. 

The nodes of an activity graph are the processes that participate in performing the 
transaction. For each arc from an item X to an item y in the transaction graph for T, the 
activity graph contains one arc pointing to each manager that holds a copy of y from some 
manager holding a copy of x. Arcs connecting a process to itself are not shown. If an item x 
which is the source of some arc in the activity graph of T is replicated, then we have a 
choice of which copy of x to use in computing the output of T dependent on x. This choice 



1. Note that if a transaction graph contains a cycle, this means that some item in the cycle 
must be accessed at least twice in any implementation. 



-90- 
is reflected by the arc in the activity graph of T connecting torn process holding a copy of x 
to the process that holds an item whose new value depends on x. If transactions are run 
atomically, then all copies of a replicated item seen by a transaction have the same value, and 
thus the choice of which one to use will not effect the output values produced by the 
transaction. 

Figure 4.2 shows the activity graph for an implementation of the transaction depicted 
in Figure 4.1, in which each of the items is replicated at two. of the three data managers. 
The graph indicates several decisions made about the implementation of T. Mj holds copies 
of items x and y. The new values produced for these items depend only on their previous 
values, so a decision has been made so that M j is to compute the new values for its copies of 
these items from their previous values at Mj. Similarly, Af £ is to use the old values of the 
copies of y and z that it holds to compute their new values. Af j, however, holds a copy of z, 
but no copy of y from which to compute the new value of z. A decision has been made that 
M 3 is to obtain this information from the copy of y hekf byM^ 

Notice that in this example, all three managers participate it* «he computation of the 
outputs of the transaction. This results in some duplication of effort, as, for example, both 
At; and Af ^ compute new values for x. We couM have centralized the computation of the 
outputs of the transaction in one of the three nwnagers and distributed the results to the 
other managers, which would have lead to a radicaHy different activity graph. 

The model of a transaction used in this thesis, in which various parts of a 
transaction are performed in parallel, is different from the model used by many other 
researchers in which the accesses required to carry out a transaction take place in some well 
defined sequence. Allowing for parallel execution of various parte of a transaction not only 
allows the transaction to be completed faster, but also simplifies the task of synchronization 



-91- 
because the synchronization mechanism can choose the order in which two parts of a 
transaction that are logically independent (such as those performed by Mj and M 2 in this 
example) are performed. 

The arcs in an activity graph represent constraints on the order in which the process 
steps used to perform a transaction can occur. Some step of a process that is the source of 
one of these arcs must be completed before some step of the process that is the destination of 
that arc. Recall that performing a transaction atomically with nspect to other transactions 
also constrains the order in which process steps occur. The difficulty of coordinating a 
group of transactions to be performed atqmically depends on the interaction among their 
activity graphs. 

For a group of transactions, we can construct a joint activity graph, which is a 
merger of the activity graphs of the individual transactions. The joint activity graph 
contains an arc between two processes whenever the activity graph of some transaction in the 

Figure 4.2 
An Activity Graph For em Implementation of T 



(^l) (** )• > (^) 



Assignment of Items to Managers 

Mj: x,y M2: yjt Mj: x,y 



-92- 
group contains such an arc. Each arc is labeled with the names of the transactions that 
contribute that arc Figure 4.3 shows an example of such a graph for three simple 
"transactions. 

Each of the three transactions is responsible for one of the three arcs in their Joint 
activity graph. This is because each transaction transfers information for an item held by 
one. manager to an item held by some other manager. 

Activity graphs and joint activity graphs can be viewed as finer grained versions of 
the L-U graphs used to describe transactions in SDD-ItB«rn*tem77l The analysis of 
transactions in SDD-1 does m« examine th< derivation of outpua from inputs, but instead 
assumes that each output of a transaction may depend on any of the inputs. In fact, each 
output may depend on only a small subset of the values read, t.fitct that is represented in 
activity graphs. 

Activity graphs provide a simple way of describing the way in which input values 
seen by a transaction affect the output values produced. The arcs in an activity graph also 
describe ordering relationships among the process steps mat carry out a transaction in that 
the process step at the source of same art must be ce i i^l e ted before the process step that is 
the destination of that arc. 

The next section of this chapter examines the impact of the patterns of accesses of a 
group of transactions, as described by their joint activity graph, on the synchronization 
techniques that must be used to coordinate those transactions. 



- 93- 

Figure 4.3 

A Joint Activity Graph 




Transactions: 

Tj: Set B - B + A 

T 2 : Set A = A + B 
T 3 : Set C - C + A 
Assignment of Items to Managers: 



Mj: A 
M 2 :B 



-94- 
4.2 A Simple Approach to Transaction Synchronization 

In the previous chapter, I presented a simple mechanism to distribute a set of 
messages to a set of receivers as an atomic broadcast. This mechanism could be used to 
distribute a set of input messages to the process steps of a transaction. In this section, I 
explore the applicability of the atomic broadcast mechanism to the problem of performing 
transactions atomtcaHy. I show that that mechanism can be used only when the joint activity 
graph of the group of transactions to be performed does not contain a cycle. 

The simple synchronization scheme developed for atomic broadcasting cannot be 
used directly to coordinate a transaction that has an activity graph containing an arc 
connecting two processes. This is because there is no way to describe such a transaction in a 
set of independent messages to be delivered to the data managers as an atomic broadcast 
The process step at the source of an arc must be completed before * message describing the 
access to be performed by the process step at the destination of that arc can be formulated. 

One might expect that the atomic broadcast protocol could be modified somehow in 
order in to synchronize a group of transactions using sequencing of messages between 
processes to control the order of process steps. If the joint activity graph of the transactions 
does not contain a cycle of arcs, then this can be done, as wnt be shown tor the following 
section. If, however, the joint activity graph of the set of transactions to be performed 
contains a cycle, any protocol for coordinating the transactions must use some form of 
locking, as will be shown subsequently. 



-95- 
4.2.1 Synchronization of Transaction Croups Without Cycle* 

First I will show how to coordinate a group of transactions whose joint activity graph 
contains no cycles. The approach I will use is to modify the message forwarder scheme of 
the last chapter to allow a process to act both as a message forwarder and a data manager. 
Such a process receives a message and produces a group of messages for it| children in the 
hierarchy in each process step. The messages produced need not be a simple partitioning of 
the message received; but can depend on the local state of the teceivirtg process. 

One can perform the transaction depicted in Figure 4.2, for example, by making 
process Af 2 the parent of both Af j and M 3 -hi the hierarchy. The transaction could then be 
performed by sending a message describing the transaction to ~W% using the atomic 
broadcast protocol described in Chapter 3. This message propagates through the hierarchy 
until it reaches M% When this message is received M^, that data manager p er for ms the 
specified updates to its copies of y and z. In the same process step, M 2 forwards the portion 
of the request relevant to Af j, and sends a message to Mj describing the accesses to be 
performed on x and z for Af $. M 2 includes the current value of y in the message sent to 

Including some of the data managers as message forwarders in the hierarchy allows 
some of the process steps of a transaction to be pe rforme d before the input messages sent to 
other steps are constructed, while retaining the hierarchical structure of message sending. 
Recall that the message forwarder protocol of Chapter 3 insures that all of the processes, 
message forwarders and data managers alike, see a broadcast as atomic The request to 
perform a transaction in this scheme is treated like an atomic broadcast, and thus is seen as 
atomic by the data managers. 



-96- 
A group of transactions can be performed atomicatty with the modified message 
forwarder scheme whenever a hierarchy of message forwarders and data managers can be 
constructed so that the arcs in the joint activity graph always run from a process to one of its 
descendants. This can be done whenever the Joint activity graph contains no cycles. In a 
later section, 1 show how assignment of data items to date managers can be chosen so as to 
eliminate cycles from the joint activity graph of any exptettd group of transactions. 

4.2.2 Synchronization of Transaction* with Cycles In the Joint Activity Graph 

If there is a cycle in the joint activity graph of a group of transactions, then there is 
no way to construct a hierarchy so that a process that is the source of some arc it always an 
ancestor of the destination process of that arc Thus the message forwarder protocol cannot 
be used. The following paragraph* give an argument to support the claim that any protocol 
that correctly coordinates a group of transaction wheat Joint activity graph contains a cycle 
must use locking. 

Consider first a group of two transactions that form a cycle, such as T| and T ? in 
Figure 4.3. The execution of a transaction consists of a set of process steps. The arcs in the 
joint activity graph indicate that T| must be performed by a set of process steps in which a 
process step of Mj precedes a process step of M 2 . Similarly, in performing Ta»* process step 
of M 2 must precede a process step of M j. 

To perform the two transactions atomically, either both steps performing T| must 
precede both steps for T 2 , or vice versa. To perform the transaction* without locking, recall 
from chapter two that at most one process step of each data manager can be used for each 
transaction, and that the sequencing of messages between processes is the only restriction 
that can prevent a message that has been sent from being received promptly. We must 



-97- 
therefore prevent, somehow, the situation that the process step of T| at M j and the process 
step of T 2 and M 2 are both completed before either transaction is completed. This can be 
shown tti be impossible by demonstrating that this undesirable situation can be forced to 
occur in an execution of any protocol for the synchronization of T| and 1*2 that does not use 
some form of locking. 

Consider the state of the system during the execution of T 4 in which Mj is 
performing its process step of T } . If Tj were begun at thU point, the synchroniiation 
protocol must prevent the execution of the process step of M 2 related to T 2 from preceding 
that which accomplishes the completion of Tj. Without using locking to control the order in 
which messages are received, the only way to control the order in which M 2 receives the 
messages pertaining to the two transactions so that the undesirable order is avoided is to 
have both messages sent by Mj, and use sequencing of messages between Mj and M 2 to 
force the messages to be received in the correct order. 

Thus to force the execution of the process step of Afj that completes T| to precede 
that that begins T 2 , both of these process steps must be triggered by messages sent from Mj. 
This means that the execution of T 2 must include two process steps of Mj, one that precedes 
the step of Af 2 and one that follows that step. Using two steps b of one process to perform 
one transaction is a form of locking, therefore it is impossible to coordinate the cycle of two 
transactions without locking. 

This argument can be extended to cycles of any size by demonstrating that unless 
locking of some form is used, then it must be possible to reach a state in the execution of the 
system in which each transaction has completed a process step in one of the processes in the 
cycle, making it impossible to complete the execution of the transactions atomicaily. We are 
left with the conclusion that some other mechanism must be needed in order to synchronize a 



-98- 
cycle of transactions. The locking mechanism used in Oris thtsis is explicit locking. This 
locking mechanism consists of delaying the reception of a message until some other message 
from some other process is received. 1 Locking U to be avoided wherever possible, because a 
failure of the sender of the expected message, or of the communication network, may delay 
processing of messages from other sources This violates our goal of partial operabitity, as 
now a group of functioning sites cannot necessarily carry out a transaction purely local to 
those sites, because one of the processes involved in the transaction may be locked, wafting 
for a message from seme other site. This problem wifl be considered in greater detail in 
Chapters. 

The particular mechanism that I will use for locking in the process model is to place 
a prerequisite on the process step specification of a process step. A pre-retfUisite is a 
predicate that may include variables in the local state of the, process. A process step is not 
performed unless the pre-requiiite for that step is satisfied. ....JBf placing a prerequisite on all 
process steps that receives messages from one of the input ports of a process, one can inhibit 
the reception of messages it that port until some condition is met. 

With this locking mechanism, we can now extend tfw transaction synchronization 
mechanism in the previous section to coordinate arbitrary groups of transactions. 



I. Note that m sequencing, it it possible that the prowMtag of a message at postponed, but 
only until a message sent from the same sending process is received. 



; : ^i^W*£ 



-99- 
4.S Classes of Transactions 

On the basis of the activity graph of a transaction, we can group transactions into 
three classes. These classes reflect the difficulty of correctly synchronizing the. transactions, in 
terms of the mechanisms needed. 

4.3.1 Transactions with Independent Components 

The simplest class is that of transactions whose activity graphs contain no arcs. I 
refer to these transactions as transactions with independent components. A transaction 
with an activity graph with no arcs can be performed atomicalty using only sequencing by 
using the hierarchical protocol described in the previous section. Such a transaction places 
no constraints on the organization of the hierarchy, as ajiy hierarchy can be used. The 
hierarchy can be chosen to optimize locality of reference, without concern for introducing the 
need for locking in these transactions. 

An example of such a transaction would be a transaction which adds Bt interest to 
all of the savings accounts in a bank. The new value of eacn account depends only on Its 
previous value. No matter how the accounts are distributed among 'data manager processes, 
each manager can compute the new balance* ©fine accounts that it holds solely from their 
previous balances.' 

It is instructive to see just how large this class of transactions is. All "query 
transactions", which do not perform any updates to the data base, fall into this class. A 
query transaction can always be performed by sending out a set of requests to the data 



1. In this simple example, I have; deliberately ignored other processing that such a 
transaction may be required to perform in an actual banking system, such as accumulating a 
total of the accounts or of the interest paid. 



-100- 

managers as an atomic broadcast in order to obtain a consistent "snapshot" of the data base. 1 
Such requests can be sent as an atomic broadcast, using the mechanism of Chapter 3, in 
order to obtain a snapshot that reflects either all or none of the effects of any other 
transaction. The sender of the requests can then gather the replies and use them to satisfy 
the query* 

A second class of transactions that always have independent components are 
transactions that only make updates to the database. If the new values that a transaction 
gives to items are completely independent of the previous state of the data base, such a 
transaction has independent components. 

A third class of transactions that always have independent components are 
transactions in a fully redundant data base, such as that considered in tRothnie77,Thomas76l 
Many of the protocols that have been developed for synchronization of transactions in a 
distributed data base work only for the fully redundant case. This point suggests that 
synchronization of transactions in a fully redundant data base may be somehow msitr than 
synchronization in a data base in which each site holds only a partial subset of all of the 
data items. In fact, the fully redundant case is easier, because aH of the transactions in a 
fully redundant data base have independent components, allowing synchronization to be 
accomplished without locking. 



1. If the data needed to satisfy a query cannot be accurately predicted in advance, this may 
be a very large set of requestt. An example of such a query would be "te« me the value of 
the record that this record points at." 

2. Alternatively, the requests can ask the managers to make copies of the data items 
involved in the query, and the copies can be processed to satUfy the query in any efficient 
manner. 



- 101 - 
Actually, a fully redundant data base violates the assumption made about locality of 
reference, as all transactions that update the data base involve all of the sites. All such 
transactions must be sequenced by the root node of any hierarchy used for the hierarchical 
synchronization scheme. 1 A much more interesting case is that of a data base that is not 
completely redundant, but still has the property that all of the input items to a transaction 
exist on any site that holds an item updated by that transaction, All transactions in such a 
system have independent components, and may also exhibit locality of reference. 

4.3.2 Transactions With Predictable Data Flow 

A second class of transactions based on activity graphs is those with activity graphs 
with well defined arcs. I call this the class of transactions with predictable data flow. Some 
of the process steps that perform such a transaction must be completed before the input 
messages to other steps can be produced. A transaction in this class cannot be performed 
atomically using the atomic broadcast scheme in every hierarchy, but instead requires that 
each process that is the source of some arc in its activity graph be an ancestor of the process 
that is at the destination of that arc. 

An example of such a transaction would be the simple check cashing transaction 
depicted in Figures 4.1 and 4.2. Any implementation of this transaction requires that an 
access to the item y precede the access that updates the value of z. 



1. Note, however, that query transactions can always be implemented as being local to one 
site and run efficiently without locking. 



-102- 
4.S.S Unpredictable Transactions 

A third class of transactions, partially distinguished front the second, is those for 
which it is impossible to predict which items will he accessed and how until some of the 
accesses are performed. If we were to construct an activity graph for a transaction whose 
access pattern is completely unpredictable, that graph must include an arc between each pair 
of managers. Such a transaction would cause a great many cycles in when included in a 
joint activity graph, even though the probability that each arc is used hi any particular 
invocation of the transaction would be small. This suggests that such transactions need 
special consideration so that they do not add to the cost of performing more predictable 
transactions. 

An example of such a transaction would be a transaction following a linked list of 
records, performing some processing on each entry in the list For such a transaction, it is 
impossible to predict which records will be accessed before the transaction is run. The 
transaction could potentially access any record in the file containing the linked list, and 
might transfer information from any of those records to any other record. 

It should first be noted that unpredictability comes in degrees. Frequently, one can 
limit the set of items that a transaction could access, for example to the records in a 
particular file. Even relatively crude bounds can reduce the number of arcs in a 
transaction's activity graph to the point where it could reasonably be treated as predictable. 
The assignment of data items to managers can greatly affect the impact of unpredictable 
transactions. If all of the items that could be targets for accesses of such a transaction are 
under the control of a single manager, the unpredictability disappears. Thus if 
"unpredictable" transactions are frequent, the choice of the assignment of data items to 
managers should be made with this in mind. 



- 103 - 

The three classes of transactions discussed abort are a categorization of transactions 
according to the difficulty of performing them aiwntcally. f am assuming; and this 
assumption appears to be consistent with current practice, that the most frequent transactions 

■'A : ... ...■■■■'" ■£,■■■%■■■■ -, ' _ ;■■■- ■ ; ; ;': ^- ■■-'.'' .— 

will be those of the first two classes. In fact, in many current applications of distributed 
information systems queries are much more frequent than updates, making the transactions 
with independent components the most frequent With this assumption in mind, I have 
designed a mechanism to provide correct synchronization for all three classes of transactions 
that is substantially more efficient and robust for transactions in the first two classes. This 
mechanism is the subject of the next section. 

4.4 A Hierarchical Scheme for Transaction Synchronization 

In this section, I present a mechanism. fof synchronization of transactions in a 
distributed information system that makes extensive use of the ideas developed above and in 
Chapter 3. The mechanism is described in terms of restrictions on the patterns of message 
passing that can occur during the execution of a transaction. In the neat section, I consider 
the implementation questions in greater detail. 

The mechanism that I will use for synchronizing transactions is an extension of the 
message forwarder mechanism described in Chapter 3. The processes are organized in a 
hierarchy including both data managers, which hold items, and message forwarders, which 
merely relay messages. Some of the data managers may act as message forwarders as well 
Each process in this hierarchy now has two types of poru.a frojldoor port, and some back 
door ports. The front door ports are used for receiving requests pertaining to new 
transactions, while the back door ports provide a mechanism that allows a process to receive 
additional messages pertaining to the current transaction wUftout enabling reception of 



-104- 
requests from new transactions. This mechanic, together with the me of p re-requisites on 
process steps, will be used for locking. 

A transaction can be initiated by any process by formulating a message describing 
the accesses to be performed. This message invokes a set of process steps that together 
perform the intended transaction. Some of these process steps are invoked by messages 
received at the front door of some process, while others are invoked by back door message 
reception. Messages sent to the front door of some process must follow a similar protocol to 
that used in atomic broadcasting: 

Messages sent to the front door may only be sent to the relatives (in 
the hierarchy) of the sender. 

A process receiving a message from one of its children through the 
front door may either send the message intact to the front door of its 
parent; without modifying its local Kate, or It may perform any 
processing desired on the message and generate messages for the 
front doors of its children. • "" 

A process receiving a message from its parent through the front 
door may perform the desired processing and send messages to the 
front door ports of its children. 

Messages sent to the front door follow the direct route in the hierarchy between the 
process that initiates a transaction and the data manners that perform the transaction. The 
same argument that was used to prove that the hierarchy of message forwarders correctly 
synchronies atomic broadcasts can be used to show that the transactions are atomic as 
ordered by front door message receptions. Not all transactions can be performed entirely 
with front door message receptions, however. A back door message is required whenever the 
process step to be performed by one process depends on data held by some other process that 
is not one of its ancestors. In order to prevent the steps invoked by back door messages from 



-105- 
introducing ordering relationships that would make transactions non-atomic, several 
restrictions must be applied to back door messages: 

Any process involved m a transaction may send a message to the 
back door port of any other process involved in that transaction. 

The reception of a message at the back. ^-docf^of a process in 
conjunction with some transaction must be preceded by the 
xt KSp i io ^ ^ i it itte $agfa»^ eancemki§ J^»l traiwctipn at the front • 
door of that process. 

No steps receiving messages at the front door of a process can occur 
between the step that receives t message ai the froW door about the 
transaction and the steps that receive messages at the back door 
about the same transaction. 

These restrictions taken together insure that all of the steps of a process related to a 
particular transaction are coruecuUve and that the CMrst step of each proceu related to a 
transaction is invoked through the. front deor. Thus tl»e ordertag ^ transaction* as 
observed through all message receptions is the same as that observed only through the 
reception of messages at the front door ports, and thus the transactions are per f orme d 
atomkally. 

The restrictions on back door messages require advance planning before a back door 
message can be sent A process must know to expect a back door message from a transaction 
so that it Wilt not receive any messages from other transactkro before getting the back door 
message. Thus the message sent to the front door of a process that will subsequently be sent 
a back door message most contain a lock reauest which (auseslhat process to stop receiving 
messages at its front door until the expected back door message is received. The next section 
describes how the messages are constructed and routed to achieve this effect 



This section discusses several detain related to the Implementation of the hierarchical 
locking scheme, fiat, I show how the messages needed in the implementatiori of a 
transaction are constructed from the description of the transaction. This is die responsibility 
of the transaction process, though the individual data managers most abb send messages as 
outputs of the proem a e u i t hat they perform. Aiwcher tssoe dls t u a wa is the coordination of 
messages sent to the back door ports to conform to the rules described in the previous 
section. An efficient imptoawftlon is described in whfcfe tack requests can be sent to a 
large number of processes without actually delivering the requests In most cases. This 
implementation makes it practical to run unpredictable transactions using this mechanism. 
Finally, I discuss the ptuefcin of choosing the hteratthy of processes. This hierarchy should 
be chosen so as to conform cteseiy to the physical comrmmfcatton rietwoHc topotogy, to reflect 
"totality of reference" in the ttaiuacUcms to be run, snd to mwimta 

4.5.1 Constructing the Messages 

We must now show how a transaction process can perform its function of translating 
from a high level description of a transaction to be performed into a message that will 
eventually cause* the desired transaction to be performed in accordance with the 
synchronization rules described above. I will first consider ..the class of transactions with 
predictable data flow, for which it is possible for the tranMCtton process to know the set of 
accesses to be performed (or at least the manager processes that perform Ithose. accesses) in 
advance Later, I will show how the scheme can be extended to transections *Jth 
unpredictable flow. 



-107- 
For a transaction with predictable data flow, the transaction process can know in 
advance what accesses are to be performed and thus could formulate a message to be 
distributed to the managers describing those accesses. Two steps must be carried out in 
formulating the set of messages. First, the accesses to be performed must be derived from the 
high level description of the transaction, and then the set of messages to be distributed to the 
managers must be constructed. The first of these tasks is performed bj the transaction 
process. 

The second task requires knowledge of the hierarchy as well as knowledge of the 
transaction. We could require each transaction process to have this knowledge, allowing it to 
formulate the set of messages described below, however it seems more natural to delegate this 
task to a process in the hierarchy that is above ill of We data managers that are to 
participate in the transaction. 

The transaction process formulates a description of the transaction that describes the 
accesses to be performed. This description may be similar to a transaction graph for the 
transaction. The transaction process then sends a message containing this description to its 
parent in the hierarchy. 

Each process receiving such a description of a transaction to be performed examines 
the description to determine whether or not all of the data items accessed by the transaction 
are held by data managers that are below the receiving process in the hierarchy. If not, the 
description is forwarded intact to the parent of the receiver. If all of the data managers that 
are to participate in the transaction are below the receiving process in die hierarchy, the 
receiving process has the knowledge to generate the messages necessary to perform the 
transaction. The receiving process formulates a description of the transaction in a set of 



-SB- 

componenu, «w war anon snanauer mat austso^etes to penonntng -sue traOTsacuon. t twn 



Each manager is given a description of the accesses that it to to 
'perform: tlti uasutptHin ■■ may te at attf level ttat is 
underst a ndable to the data manager* the transaction process, and 
ne process twnsu mung an aesciipuDn. 

Each manager that is to receive hack door messag e s is in addition 
given a lock request, which describ e s the nature of tite expected 
back door i 



Each manager tivat most produce back door messages Is '"given a 
description of what messages are to be produced and their 
destinations. ' jr ""' "* v 

Each manager that must produce input for its descendants in the 
hierarchy /Ibesatsse of an arc in the tmnissf ian fraph) 4s given a 
description of the snout to be produced. 

The process constructing this description then treats it like a message that it has 
received through its front door containing c ompon e nt requests to be distributed. Each audi 
message is processed as follows: 



A process M receiving a message through to front door port examin e s that 
to see if it contains a compo ne nt for A*. If not, the message is partitioned according to the 
message forwarder algorithm of Chapter 3 and distributed to the children of M. If the 
message contains a component for M, then M takes action on the message. 



The action taken depends on whether or net the ^cotiunuuaU: of tiutte/uosage destined 
for Af contains a lock request. If tt tlc«»iKit, then M pernavas whau^cr acosu U spectfled <U 
is guaranteed to have sufficient mferraation to do sok peesiblf modifies the other 
components of the -message a* inchtde data values to he f amd Hba its HSticanri siua, and 
distributes the components of the message to to children according to the atomic 



-109- 
broadcasting protocol. Any back door messages to be sent by M ire abo tent by the same 
process step. 

If the message contains a lock request for M+ then At cannot perform all of Its 
accesses until it receives additional information.' Some of the accesses to be performed by M 
depend on receiving additional information (mm some other process. M distributes the 
components of the message to te children (possibly modifymg some e# the components to 
include values of data items held by M), and tends any back *»» messages that are 
requested. M then stops receiving messages at to front door until necessary back door 
messages are received. When M receives aHof the back door messages associated with the 
transaction that sent the lock request, it canrparform ?<th§ specified accesses and re-enable 
message reception through the front door. 

Some care must be taken with back door messages: to avoid confusion. The back 
door messages of several concurrent transactions for: some preceat nu^ beeome imermingtod, 
causing * back door message to arrive at a process before me correspoiidlng kxk raquest 
The simplest solution to this problem is to use a separate back door port for each 
transaction. The transaction process initiating a transacuo*«choses an identifier for each 
transaction. This identifier can be combined with a process; name to form a unique back 
door port for each transaction and each process involved in that transaction. A process that 
has received a lock request can then enable message reception only through the back door 
for the particular transaction being performed. 



-m- 

4.9.4 laOOTUlUOraonf OP lOTipPHWCIMHV' fRMMIr 

Two problems mint be overcome in applying this mechanism to transactions with 
unpredictable flow. Each process that could receive a back door message in performing a 
transaction must be sent a lock request Because the set of data managers involved in a 
transaction cannot be predicted until some of the transaction has been performed* all 
potential pcjtttipanfs -to** transection mast irtittalrf be sent lock requests This allows the 
accesses that cannot be- pwdkted to advance to be p e rf or m e d by sending messages to the 
back door of the appeupilaii manager when! the access to be pwfi wme d ts known. The 
component leanest sentrttpa manager may cause thai m a nager to send and -receive back door 
messages dependent on the ttems that die manager hekts and the information it receives in 
the back door messages. Any transaction can be performed in this way. 

The second problem comes to determining when a transaction has been completed, so 
that the data managers sent lock nqueiti can release those locks. Because the set of accesses 
to be performed ts not known to advance, a process that has received a lock request does not 
know when it has received at of the messages connected with the transaction that it wilt ever 
receive and thos when to release its lock. Each manager most remain kicked until it has 
received all of the message* that pertain to the transaction that sent the lock request 

A simple solution to the problem of determining when an unpredictable transaction 
has been completed is to have some process monitor the progress of that transaction. When 
the transaction has been completed, the monitoring process can send out back door messages 
to all of the recipients of locks to release the locks. This strategy may sound very inefficient, 
however the next section discusses the problem of distributing the lock requests, and 
describes an efficient implementation of locking for unpredictable transactions based on the 
approach of this section. 



- Ill - 

The progress of an unpredictable transaction is monitored by having each process 
that finishes some portion of the transaction report to a monitor process. Any process can act 
as the monitor for a transaction, however communication will probaMy be minimized if the 
highest process in the hierarchy involved in performing; the transaction performs the 
monitoring function. 

Each message (front door and back doer) sent in perferming a transaction carries a 
completion weight A process initiating a traiwactton arbitrarily assigns completion weights 
to the messages that it sends so that thete weights sum » one. Each process step 
redistributes the comptetion weight of the message that It receives among the messages 
produced by that step. No message is ever assigned a completion weight of zero, and every 
message sent by each process is given some comptetion weight* If a step produces no output 
messages for other processes involved in the traasacttoO, « instead produces an output 
message for the monitor containing the entire comptetion weight received at that step. Thus 
completion weights arc gradually returned to the tmnsactiOB montair process as the various 
process steps of the transaction are completed. The transaction is done when the comptetion 
weights in the messages sent to the monitor sum to one. 2 



1. An optimization of this scheme would be to recognize the special case of a a message 
containfng only toek requests. In performing an unpredictable transaction, many of the lock 
requests that are sent may be completely unnecessary^and need notbe delivered before the 
transaction is completed. We can speed up the recognition of the completion of the 
transaction by assigning.any me^ge wnUinii^ oiOy lock r«*««* * wmptettan weight of 
zero. If the locks in that message are necessary, some other message with a non-zero 
completion weight will be forced to waU uhtii tiien^essaryiacks are vtorived. The next 
section describes a scheme in which the number of messages that must be sent to perform an 
unpredictable transaction can be substantially reduced by postponing or eliminating delivery 
of unnecessary lock requests. 

2. The arithmetic on completion weights must be done carefully so as to avoid loosing or 
gaining completion weight due to round off error. One could maintain completion weights 
as rational numbers rather than as floating point solution so as to avoid round off error. 



■v. . . *-** •« -►.., • - -, ..* . ■A l »»Wjt^^»-**A**- •"- * 



1' * 



.fsft 1PIWW 




JgjflJ*K»:Ji*«tfn 




■ . ii wim ii v i ^p piwhwww^p^w^ww^twii 



9* 



m-% 

















to 



-113- 
through the hierarchy, being forwarded onty when "pushed" by subsequent messages or the 
completion of the transaction setting the lock. 

This can be accomplished by; sttghtly modifying the implementation of the processes 
in the hierarchy. Recall that each such process maintains a queue of messages to be 
delivered to each of its relatives in the hierarchy. The implementation described in Chapter 
2 attempted to forward message from each of these queues whenever they were non-empty. 
One could instead construct the implementation so that messages are forwarded from a 
queue only when the queue contains a message which Is not purely a lock request 

Consider the hierarchy and transaction depicted in Figure 4.4. The transaction uses 
the values of data stored at Af j to update data at My Af 5 's only participation is to take the 
value produced by Af j and use it in an update. This transaction would be implemented by 
sending a message containing components for both Af / and My When this request reaches 
M 4 , these components are separated. The component for Af/ travels quickly down the 
hierarchy to its destination. The component fof My however, contains only the lock request, 
and will not be sent from Af^ to M 6 until pushed by additional requests. Thus it is likely 
that while Af; is computing the value to be sent to Afj, the lock request will be held up 
awaiting delivery to Af 6 . This allows Af; to continue to participate in transactions local to 
the right hand half of the hierarchy white T is being performed at Af j. 

This example raises another problem that must be solved: that of insuring that the 
lock request for a transaction does arrive eventually, and arrives before the back door 
messages of the transaction. This problem is partially solved by using a unique port for 
receiving back door messages related to each transaction, as once the back door message 
arrives, it will wait at Its unique port until the corresponding lock request arrives. It would 



Vt0mm4w6- 



The Hi«r»x*hy 




• •W' ^ff*#w«ly WfmjpiSf 1f» A 





-115- 
be desirable if the lock request could be delivered promptly once the transaction has 
produced a message for the back door port. This can be achieved in one of two ways. 

The implementation of a process could notice when a message is waiting at the back 
door port and send a request up the hierarchy to forward any toe* requests. This strategy 
would be effective, but may requires additional message tending, if the communication 
network topology closely corresponds to the synchronization hierarchy, a second strategy may 
be more effective. 

If the communication network topology closely parallels the topology of the hierarchy, 
then any message, including the back door messages, must essentially flow along the arcs in 
the hierarchy to reach its destination. We can take advantaged thts fact to provide for 
prompt forwarding of lock, requests when appropriate. Each process that is not a leaf node 
in the hierarchy now has a third type of port, a ga& t b r o aeh port Each such process is 
always ready to receive messages at its pass through port, and pass them on to one of its 
relatives in the hierarchy. The pass through peril provide a mechanism to tend back door 
messages from one process to another in the hierarchy through intervening processes. Each 
such message is identified with its ultimate desttnattorv and sent to the pass through port of 
the parent of the sender. When a process receives a message at its pass through port it 
passes it on either to the pass through port of one of its relatives or, if the ultimate 
destination is a relative of the receiving process, to the ultimate destination port 

The pass through ports provide a mechanism to allow each process in the path of a 
back door message to notice its progress. A protest eeh match Stock request with back door 
messages that it i* also forwarding by the unique port ID of the destination back door port 
When a process has a back door message to be forwarded to one of its relatives, it checks itt 
queue of front door messages to be forwsrded le the same relative for a corresponding lock 



request. If sue* » luqanit i*4M^ that 

request ami le s riacau in ms q u e ue e< front deer menagei, Thto. will cause that request to be 
promptly forwented* a* it ft* new net sefety a lock request 

One point of caution should be noted. If the lock request J* contained in a mes s ag e 
that has been forwarded hut not hut net yet iftinewtejlgnfc ^ cannet be combined with the 
back 'door message ami bech meet be far warded mricpsndently. Thi* is because the process 
to which the Jock request had been sent may have already received R, so that it is to© late to 
modify it. Thus each procest must keep track of messages that tiave been forwarded but not 

yet acknowledged 

■■\ 

bi the example of Figure *4> when Afj has fmuhedceenputing the value that it 
sends to M5. it send* nae a beck dose message, f hit massags propagates up the hierarchy 
through the past through petit of Af 2 and Mf When M4 attempts to forward this message 
to Af ^ , it notices the corresponding lock request k combines the back door message with the 
lock request, and sandt oat cetidMnsd message to the- front deer of Afg. * When Mj receives 
the co m b i ned meesefav it performs the specified update, -and reekaes that its roto in the 
' transaction is iut i q i hmi, tetung a«d rehating a lock in the same process step. If, however, 
M5 were required to receive additional messages in carrying out the transaction, it would 
remain locked ur^t the** mesaafes were received. 

Pass through ports also provide a mechanism to optimize the execution of 
unpredictable transactions. In an unpredictable trwwaaion, * gteat many processes may be 
sent lock requests, and later leek release* and not participate i» performing the transaction. 
Using the scheme fm forwarding lock requests described above, most of these requests will 
not be delivered until the transaction has been cumpjteted, and wHIawatt forwarding at some 
level of the hierarchy. Thus while an unpredictable transaction may send out a great many 



; yHi&XSittHH: 



-117- 
locks, few wilt actually be received. When the transaction is completed, however, lock release 
messages will be sent oat for aH of the participants In the transaction. Because these are sent 
out as back door messages, the processes forwarding the lock release messages will attempt to 
combine them with the lock requests stttt awaiting forwarding. When' a lock request is 
combined with a lock release, it is known that the lock is unnecessary and both messages tan 
be discarded. 

Using this implementation, it is likely that most lock requests wHt be retained at a 
high level in the hierarchy. Most of the unnecessary lock requests will be canceled at a high 
level ; before much effort has been expended in delivering them to their destinations. This 
implementation makes it practical to run transactions mat: are very uncertain and must lock a 
large number of managers but in fact perform vert few accesses. If, as assumed throughout 
this thesis, most of the transactions involve managers with t common parent at r low level of 
the hierarchy, then running a transaction that sett many unnecessary locks interferes very 
little with the execution of most of the transaction!, as the bck requests that are hot needed 
never reach the level in the hierarchy at which they would interfere with the more frequent 
transactions. 

4.5.4 Choosing the Hierarchy 

Several considerations should guide the choice of a synchronization hierarchy for a 
distributed information system. The hierarchy should reflect the patterns of locality of 
reference in the expected transactions. There are frequently natural boundaries of the 
application, such as the focal and regional offices of in organization using a distributed 
information system for inventory control, which can guide this choice. 



-118- 

™ ^e*Sp|M f^PVp* ffffv olf IP^ISSJs lN|§^ H^Hlf?:'^' r ';^T^^^^^^^ -*RHn' ^WtWts^^) w*i rV^*e*^ * .£?v f"*" 
SIIOWS lifMMl fPOfJC MlpWK impwPBI^WiHP »•• Hie pHWWI^ IB *^W» ™ Ww MfimpMt\~,Vt 

communication respired. If the hierarchy exactly roatebet the jwtwpr.lt, the cqmmunication 

»Mi»4waii few tJM»i» iMfrtwula tg minimi lift 

In many cases, the topology of the communication network closely parallels the 
patterns of locally b^Jfifrence. This to because it makes sense for the siftes that must 
communicate most 6*oue«th; tf he directly connected, or to he connected to a shared 
broadcast network* 0«e notahl* exception u imCwqtU in wh*h the site* are connected 
through a common carrier* such ,a* the ARPANET* o/,JJK|4|$T^ ** these cases* many 
independent <Ji«ributed fctttprroatton systems tbare the tsnw comfmrnication network. . The 
patterns of tacaUty of refr*«w in the wii^^ b&$he 

communication network as a whole. Thut the topotogy of i^^tte^sef k,,!* i|©* ttkejy, to 

A third factor in the choice of the hierarchy is the c'^iacjfty and reliability individual 
sites. Given some reasonable approximations for the expected transaction*, one can estimate 
the volume and importance of the message traffic through each j^ These, shoqtt; be used 
in evaluating whether or hot a particular organization is suitable, by insuring that each site 
has sufficient capacity to handle th« expcMlteli^^^ 
transactions do not depend on the availability df a site that Is known w be unreliable. 



Another faetot to be considered is the des|re tt;....|vpjg . locking. Locking; is 
undesirable both because it increases the number of messages that muu be lock 

request messages), and violates the goals of autonomy and partial operabiUty. A process that 
has received a lock request it dependent on other processes to complete the transaction and 



.'•V« WfHSSfe* ■ ■'-* i ijjl&&&*&&**^i,. 



- 119 - 

release the lock before it can continue processing other transactions. In the next Chapter, I 
witl present a mechanism that provides a solution to this problem, allowing a process that 
has received a lock request to continue processing other rraniaetions before the outcome of 
the" transaction sending the lock is known. It Is stltl desirable, however, to reduce locking, 
and to choose the hierarchy so that frequent trarisictiofts do not require locking, and 
processes managing fTeqi^tly used data are rarely Iccked. 

4.6 A Rejected Alternative Solution 

This solution is of course only one of many that could be used for the problem of 
controlling transactions. There are several solutions that provide correct synchronization 
with simpler protocols. In this section, I discus! Drlefly one of these alternatives and the 
reasons for its rejection. " ■•:.■.■■■"**?.:■■ 

Considerable complexity is introduced into the scheme bj? the ability to begin a 
transaction at any level of the hierarchy. If we had required all transactions to begin with a 
request sent to the root of the hierarchy, it would be easy to lock a large portion of the 
hierarchy in order to perform some transaction, this could be done as follows: 

When a process receives a message, it performs whatever action is required of itself, 
and passes on the components of that message to its ^hiklren as before. If the process 
requires input from, one of its children to complete iu requested access, or if one of the 
requests forwarded cannot be completed solely based on the information in that request, the 
process sets a lock and stops receiving new messages until it can complete its requested action 
and distribute all of the necessary information to each erf its children. Each process makes a 
local decision about locking and there is no difficulty detecting when a transaction has been 
completed. 



fS&&$9K3!MN|l! 



w»-- -"* ■ ^ : -***4iim*;*>8m 



ilfllpiswws*^ 



|P^' 






iBWlJBMP^ 











fMtalMMoO 



$ it- v s.t • '--' -..j enc-ii>«ruiJ ffc cmiy*:*! fo*rt ?w 11 ' ,|j$ftt*fthf «!i%».teW9il y«* '*& iiOi+sjGSftfi-jJ 



#<tt K? 



<^fTO^^ifi*!n*J^f , ffiWKP^^ *PWPM^^rWrW^>»!!{wr 




*<f 



wUg^ttgL J * rfl ^ no«*«ncrtoi 9* fro Imwt $^-bi/H$qmt) *4 *mku» &»&•?«*"!©'! zsaupm 



& md-h ;imtfy^t> vi 



in '9 r a«-«>^. 



^*>v.i«;«•^":i-Js»»' '-i^if^i^sj^tsaftio. ■ 



- 121 - 

Figure 445 

Concurrency Restriction* Due »• Bieirarebioel Struoture 

, The Hierarchy 




A Joint Activity Oraph of the Transaction* 





0. 



/*h\ 



■ 4.7 Coacjpkm. aae; Summary 

Several point* aba* this sohitfc* sbMlki be imtad; 0P« is tr»t a h^ oumber of 
transactions can be performed without locking. The Hierarchy can be arranged so that the 
transactions expected to be most frequent do not require taking. Without kicking, the 
problem of deadlock detection and prevention, and the 'd iitiib e md atomic update problem" 
described in the next chapter, do not frlse. 

A second point is that deadtak is impossible in tWs scboroe. The kicks are set in 
messages distributed m fB atomic broadcast ^f» if any tak set by a transaction Tj 
precedes a lock set by a transaction T t »H of the tocki s«by Tj w# precede those of T 2 and 
T, will b« able to complete wtthout tkadkxk. 

Another point was ilhistrated by Figuwl^ When bck|»ir^*o»Mred. frequently the 
setting of locks can be decayed, reducing the tone Ihtemf in which items are lacked. The 
scheme presented does this by delaying ordering decision*, and distributing the decision of a 
transaction's: 



The hierirchicfl Kheme for performing transaction achieves the goal of partial 
operability to some exfsnL Without taking, a tranmckej can be performed as long as alt of 
the processes and communication links that he on the paths between the processes that must 
communicate in performing the transaction are functional While this does not completely 
achieve the goal, as it is possible that two processes wiH be prevented from performing some 
transaction because of the unavailability of their parent lea the hierarchy, the hierarchy can 
be tailored to make this circumstance unlikely. 



'®*r*ff *?*'""» 



-123- 
Locking introduces the possibility that a process will be prevented from performing 
local operations because a failure has delayed the transaction setting a lock. In the next 
chapter, I present a mechanism to deal specifically with this problem. 

This chapter introduced a method for analyzing the patterns of the accesses 
performed by a transactions, namely transaction graphs and activity graphs. Using these 
transaction graphs, we demonstrated that sequencing of messages between processes is not 
itself sufficient to provide synchronization for some sets of concurrent transactions. Three 
classes of transactions were discussed. Many of the transactions that we expect to be 
performed in information system fall into either the class of transactions with independent 
components/or the class of transactions with predictable data flow. (The transactions in the 
example system discussed in chapter 6 are nearly all in the first class.) These are the simplest 
transactions to synchronize. 

A mechanism was presented to coordinate concurrent transactions using the atomic 
broadcasting mechanism developed in Chapter 3. This mechanism correctly synchronizes 
transactions of all three classes, but works most efficiently (in terms of the number of 
messages needed) on transactions in the first two classes. The mechanism can be optimized 
to perform those transactions that are known to be important at the time of the design of the 
system. 

The implementation of this mechanism was considered to show how the messages are 
generated from a description of the transaction, and how the processes are Implemented. 
This implementation demonstrated techniques to reduce the amount of overhead caused by 
transactions requiring locking. Finally, the important properties of this solution were 
summarized. 



124 



-125- 
Chaptor 8 
Poly values: A Moobanisnt f or Pofrforntlitg A totnio Updates to 
Distributed Data 



In this chapter, I consider the implications of using locking on the problem of 
achieving the goal of partial operabliity. First, I show that no system that uses locking can 
achieve this goal. A mechanism is presented that solves this problem, by allowing a process 
that is participating in a transaction and hat set a lock to install the results of that 
transaction conditionally, so that it can release the lock and continue processing other 
transactions before knowing whether or not the transaction setting the lock wilt be completed. 

5.1 Motivation (The Trteble with Locking) 

In the previous chapter, it was demonstrjued that some form of kxking is necessary 
for synchronizing certain groups of transactions. Unfortunately, locking compromises the 
goal of partial operability, ai a site that has received a lock cannot perform local transactions 
conflicting with that lock until the locking transaction U completed. One could imagine a 
solution to this problem in which a site that has received a Jock coulci abandon that lock, 
aborting the transaction setting that lock. This mutt be done in such a way that if a lock is 
abandoned, alt of the sites participating in the transaction which set that lock wilt decide to 
abort that transaction. 

To achieve the goal of partial operability, each site must be able to decide whether 
or not to complete the transaction without consulting other sites. In this chapter, I refer to 
the decision of whether or not a transaction has been completed as the outcome of the 



-.r.v»-~n -~t™* 



s^ tflffl^l a? ^ ~"*<*<«-*"*H 









t*A $&H Ufa * ifcfciw nt psfcfcmi eiHJ oj nyiiutm 

si fes; . "••. ;jrij im * fruu^m tnoto *4 mm *MT .sbcrf fcsrfl $nttfet flptonuuFB w*i gatiiorfs 

-CflCttiitaf HMD JflBMMIM J 




*ni', :. ♦•r-Cv- *' •> •■-*. * ..-*-;• -• - « x :*ri am&sansti s tern t® tsrttwtw k* notesjafc ads 



- 127 - 
In order to achieve the goal of partial ooeraiwUty. the kxking must not exclude 
transactions local to process X or to Y indefinitely, Failure of X or of Y or of the 
communication network connecting them may, however, delay any message sent between the 
two indefinitely. This means that each process must at an* point be able to decide whether 
or not to abort the transaction in progress withmit coawMimcaueA with othef processes. 

A protocol of message exchanges between X and Y that decides the outcome of a 
transaction can be viewed as series of process steps in each process. Each of these steps is 
triggered by a message that may be delayed indeftnitely, so that after each step, each process 
must be prepared to decide whether or not to abcft the transaction. This decision must be 
based only on the information thai that process had before beg inning the protocol and the 
information gained from messages received while f^formtng the protocpL Both processes 
must make the tan* decision at any point in the protocol 

If a failure delays messages after the first step of the protocol is performed in each of 
the processes, at least one of the processes mustdecsde to abort-the . transaction ThM is true 
because the transaction being performed requires locking, and a transaction requiring 
locking cannot be performed with a single process step in each process. Therefore, after each 
process has performed one step of the protocol, at least one of the processes must have 
insufficient information to complete the transaction, and thus must decide to abort 

If the execution of the protocol is not delayed by a failure, a step must be reached 
after which one process would decide to complete the transaction if the next message in the 
protocol were delayed by a failure. Let the first step of either process after which that 
process decides to complete be known as the commit point of the transaction and assume 
that it is a step of process X. After the commit point, X would deckle to complete the 
transaction if a failure delayed the completion of the protocol. 



- 128 - 
Now consider the decision made by proem K if » fcthtre were to prevent 
communication tmmedtetetf after the commit point step. Because the commit point was not a 
step of K, r cannot be effected fry the completion of that step, end hence tram rnalie the same 
decision before the commit point as after. This is would he « co mjadtai on, is K hnist either 
decide to complete before the commit point, vtotamg the sisUnunion that the commit point 
was the first step after which either process decided to complete, or Y must deckle to abort 
after the commit point, resulting in an inconsistent decision. 

This argument applies to any number of processes attempting to perform some 
transaction reouiring locking, and snows that there % it© way to achieve the goal of partial 
operabttity while performing transections requiring ledtmg. The argunWAtt depends on the 
property of the process model that the immediate effects of bei^^ are 

limited to one process, and that the observation of the completion of a process step by any 
other process may be delayed indefinitely. 

5.1.2 Approaches to the rVobtem of Abort able Locking 

There are several approaches that can be used to reduce the probability that a 
failure during the execution of a transaction requiring locking will cause indefinite delay. 
These approaches provide only a partial solution to the problem of achieving the goal of 
partial operabiKty because a failure or combination of failures during the execution of a 
transaction can cause indefinite delay of transactions r^that are completely local to a 
functioning site, or cause the transaction to be o er f orfTW d inconsistently. 



-129- 

5.1.2.1 Accepting Inconsistency "' 

One possible solution that has not been extensively used is to accept a small 
probability that a transaction requiring locking WW not be performed atomiceny if a failure 
occurs at the wrong time. This approach is not appr o p r iate for sift applications, as strange, 
inconsistent results may occur. If the consequences of hot being aWe to perform tome 
transaction promptly are worse than the conseq uenc es of a trnchrontxation error, (as would 
be the case for a transaction contreWihg the Ending of an airplane), then tt may be desirable 
to use a protocol in which a failure at the i wrtirig time cause a transaction to be partially 
performed, or inay cause the transaction to be mcorfecfly sequenced with other transactions. 
This kind of strategy has been used tn image pnxesimg systems; in which the data base has 
a great deal df redundancy that allows any observer to tolerate an inconsistent state. To my 
knowledge, there are ho distributed data management systems that use this approach. 

5.1.2.2 Avoiding Locking 

Another approach is to Use synchronization protocols that minimize the need for 
locking. The protocols presented in Chapter i of this thesis and those used by the SDD-1 
distributed data base system[Bernstein77] are two example* of this approach. In Chapter 4 f I 
examined the problem of organizing the data base so as to reduce the amount of locking 
required, Locking cannot be avoided entirety, however, tiniest the data base is replicated so 
that each site has a complete copy. Such replication dimihites locktng, but makes all 
transactions that update the data base require the participation of all of thelites, eliminating 
transactions that are local to one site 



5.1.2.3 MinimUInf tlM Wi»^ of Vul»er«bimy 

The approach most frequently taken to tacking in. a dis tr ibuted system is to minimize 
the time interval during which j» failure cause* indefinite daiey. ,gm .QHW&h& $$* 
approach j* the two-pb** commit profiocol c)pliSlA|QL .Ini i^Mfg^JQlM^TL,^: |5»¥ll.; -ill"* 
participating in a transaction gee* through two phases, a kxk phatt in which |o*kA are set 
and the site computes the result* of the trvw*ct^JM>d^a WAitphase during which the jMte 
fees completed the c omputation related to the transaction end hw the JnTormatioo necessary 
to make the update* requested by the trtnsj^w wiihput further input frqm Other sites, hut 
does not yet know the outcome. If a failure delays tb^43ompletfc» # the )pck phaae, ajia .a||c, 
the site ca.n decide on iu own to shoft the tranuaion, and til sites, will eventually decide on 
their own to abort or be toW of the decision, teahoft^^ the 

wait phase, however, a site .must w^ of 

the transaction. 

Figure 5.1 gives a finite state machine description of the action of one of the sites in 
this protocol. The figure shows' Rwr'iitei'ortte allowed 

transitions are shown by the arcs, each of which is fcbewd Wkh the message received to 
begin this transition, and %w message tent in making "WifmMmM Wliiic type: 

In the Jock state, a lite watts for message*:. cort»lnijig the information necessary to 
complete its portion ©f the transaction tyr determining the new value* |or the item* at that 
*ite updated, b)ybe transaction. After these have Jbje«,,reej^^ 
phase and sends an acknowledgement message indicating this fact If a failure delays the 
reception of messages by a site in the lock phase, that site can abort the transaction by 
sending an abort message and entering the abort state, discarding any computation done by 
the transaction. In either the abort or the done states, the site is ready to accept new 



-.131 - 
transactions. The acknowledgement and abort message* sent by the sites are accumulated by 
a coordinator for the transaction until either alt sites ba^twtjsOTdfrd acknowledge, or any 
site has sent an abort. The coordinator then generates done or abort messages for all of the 
participants. 

The motivation behind this protocol U that the tiina that ^ach»4t# spend* during its 
lock phase computing the results of the transaction is likely to be longer than the time spent 
during the wait phase. This is not necessarily true, as one site may take much longer than 



Figure.6.1 
A T wo-PhsvM Cownait Projboool 



Timeout 



Receive New Values 
Send Acknowledgement 



Receive Abort 




Send Abort 



-132- 
the others to corepnto Its portion of the treftseenoit, caveing : '0he ^smAmm^' sins to femiiiii ' iti the 

W*tt phase fOf a leflgpefiftWOrttme. 

Lampson and Sturgis [Lampion 76] present another commit protocol that include* an 
extra round of message exchanges to avoid this problem. In their protocol, no site enters its 
wait phase until ail of the computation of the transaction hw been completed at ill sites. 

S.l.2.4 The Peiy value Approach 

The motivation behind preventing a transaction from holding on to a lock 
indefinitely is to be able to run other transactions that need to access the data that has been 
locked without indefinite delays. Frequently, the results produced by a transection depend 
only loosely on the input values seen by that transaction. If the Outputs to be produced by a 
transaction holding a tack are known but the outcome of the transaction is uncertain, there 
are two possible sets of current values for the updated Items. One could use these two sets of 
values to determine Jar sonw Umnsactton waiting to access the updated values, whether or not 
that transaction depends on which of the possible sets of values is correct Any transaction 
that does not depend on which set of values are used can be run using either set btfort the 
outcome of the transaction with the lock is decided. The potyvalue scheme described in the 
next section is a generalization of this idea. 

5.2 The Poly value Mechanism for Avoiding Delay Due to Locking 

This section presents a mechanism that in many cases solves the problem of insuring 
that no transaction is delayed indefinitely due to a lock set by some other transaction. 



- 133 - 
5.2.1 The Poly value Concept 

If a two-phase commit protocol is used to perform a transaction, a site that has 
reached the wait phase knows output values of the transaction. If those values could 
somehow be conditionally installed, such that a transaction accessing one of the updated 
items would see both values, then the locks on the updated items could be released. This can 
be accomplished by installing what I refer to as a polvralue for each updated item. A 
polyvalue is a bookkeeping tool for keeping track of several potential current values for an 
item, depending on the outcome of currently pending transactions. 

A polyvalue is a set of pairs, <v,c>, where v is a value and c is a condition, which is 
a predicate on a set of identifiers for transactions. The pair <v,o in a polyvalue for some 
item I specifies that I has value v whenever c is true when c is evaluated in a model where 
transaction identifier T is true if T has been completed. The conditions in a single 
polyvalue must be disjoint (no assignment of truth to the transaction identifiers makes two 
conditions in the same polyvalue true) and complete (for any assignment of truth values, one 
condition is true). 

Each transaction is assigned a unique transaction identifier. When a site that has 
reached the wait phase for a particular transaction T eannet determine quickly whether T 
wilt be completed or aborted, that site installs poly values fbr afl of the items that T is trying 
to update. The polyvalue installed for an item I has two pairs, xv'Js^nd <v,->T> t where V 
was the value of I before the execution of T, and v* is the value produced by T. This 
polyvalue describes the possible values that could be the current value of I, depending on 
the eventual outcome T. 



-m- 

Befcn* iro»tat*Wr«* potyvalue b i Jtoat, individual pair* 

are expanded. Any fau>-««v^whefew is itsdf « |»ty«siue k rtplaccd by a gnmp of f>alrt. 
This group amtaim awe pah- of the form ^^Ao for «ach pair <v |( c,». that was in v. Next, 
redundant pairs afe^coalnaid. The pahs <V|£p arid <V2^, where vj « Vg, are replaced by 
the single pair mptftogt. T4wse i^undant pa Jri c»n arise because it Is postibte that several 
different possible outcomes of the pending transactions could preduoe the same value for an 
item. Fmafty, the condition attached to each pair is simplified, and any pair <tjc> for which 
c is logically fate* is discarded. 

This simplification procedure reduces the poty value constructed to one in which each 
pair has a simple value, and the number of pairs is minimized. A poly vakte with a single 
pair <v r c>. must have a condition c which is togicaHy tmt, and is indistinguishable from a 
simple value. Thus the procedure for constructing potyvalues for the results of a pending 
transaction can be described without treating the cases where the new or old values of the 
updated items ate themselves poly values as special in any way. 

5.2.2 Performing Transactions on Poiy values 

A transaction operating on pefrtfaiued imiut itw^^ ^ A 

polrtrarwactten pnxittces po^mkied resuhs, each of which spatiftes ihe aea! output value 
produced under any possibte outaeme ,*f pending tmnwet io m Ttie f o isa tii iii i; is a aimpte 
discussion ofpolyteansacnons. 

I will first describe the computation phase of a polytransaction, in which input values 
are read and outputs are computed. Each potytransactien T consists of a set of alternative 
transactions T r each of which performs the same transaction on a different set vahjes for 
input items. Each alternative transaction T c is tagged with a condition c which Is derived 



--»*•.- ■-• SSP^miJ^eiHSWTsW,, 



- 135 - 

from the conditions on the input values read by T c Each potytransaction begins with a 
single alternative transaction T^^, which begins la access Hajpis- in performing *he 
transaction. When an alternative transaction T c accesses an item w^ose current value is a 
poly value y - f<v i .C|>} B .7" c is partitioned ir^ 

of which has the same history as r c , and each of which accesses one value v t from v and 
acquires the corresponding condition Cj, in addition to the previous condition, c, on 7* c If 
cAc| is logically foist, then 7"^. can be abandoned, and not computed/ 

Thus the number of alternative transaction! ; fr$#ris % porytransactiort T is run. 
Each of these alternative transactions runs up to the wait phase <i.e. each runs until the 
outputs have been computed and distributed to all of the appropriate sites). Each site 

receiving outputs of T constructs a polyvalue for each item I to be updated. This poly value 

• .. -./.. - a • n'- ;-■-:. '-■ . '•■. • . 
contains the pairs <v,c> where v is the value produced by 7" c for I. 

If all alternative transactions of T produce outputs for some item I, then this set of 
pairs will b« complete and disjoint. 2 If, however, &ere are some alternatives of f which do 
not produce a value for I, then the conditions of the alternatives which do produce values 
for I will not be complete. This can happen if the decision df whether or not 7* updates I 
depends on the input values seen by T. Under any 4 outcome of pending transactions for 
which T will not produce a hew value for 1, 1 would retain its previous value. Therefore, if 
the conditions on the alternatives of T which produce a new value for I do not form a 



1. As wilt be shown, outputs produced by a alternative transaction with a condition that is 
logically /o/w will never be used. v „ .,, :,n- 

2. f begins with a single alternative with condition tfw. As the computation phase of T 
progresses, alternatives of F are partttioiMd acoardmg lo the oandittnns o» the poh/values 
that they access. Because the conditions on the pain of any indiv idual p o l y v a lue are 
complete and disjoint, the conditions on the alternatives of T are at any point complete and 
.disjoint. . ~^r. .<?•. •• 



-m- 

comptete *et, another pair *¥*,-**> **&**& where v' h the previous value of I, and c* is the 
logical O* of «R «* the towdfttom on the new vetoes Kir' "1* ' The wait phase of a 
polytran»ct^pT««t^w«l«a1brt»bove. Should a commtmiation failure interfere with 
the wait phase, t Dotrvahie am be produced from the Mrtput* efthe pftytransaction and Hie 
previmis vatee» fbr tlrt iwm opdartw! by the polytfkr»M«bfi 

5.2.S A Simple Example 

Let us consider a simple example involving three Hera* at three sites, and three 
transactions on those items. Let A, B,aod C be the items, and let the transactions be: 

Tj - If A £ 100 then {A - A - 100; B - B ♦ 100} 
r 2 -IFBi!OOthen{B-B-lOOjC-C*IO0} 
F, • if B>10 then B-1.05*B 

Now assume that before the trtoactioni are run, each item has value 10*. If a failure occurs 
during the wait phase of T t preventing the site holding; B from teaming the outcome of Tp 
then that site gives B a polyvalue of {<200,r ; >, dOO/^}. U r^J* now run, it will be run 
as a pc4ytiansaction, because of the polyvalue of B. Tg would produce new values for B and 
C of {<m,Tj>, <0,->Tj>} and 200. If a failure occurs during the wait phase of T 2 again 
preventing the site holding B from learning the outcome of r^. then, after simplification, B 
receives a polyvalue of {<Q t -,T ,AT 2 >, <W0,(r / Ar 2 M-.r / AHr 2 )», <200 l T|AHr 2 >J. Now, if 
Tj is run, it is performed as three alternative transactions. Twp v Of. these alternative 
transactions produce updated values for B, while the alternative fof ^f yAf^ does not, 
because the input value for B read by that alternative transaction if tod small. Thus the 



■■ m i m\ t h i mt immmmmmmitmmiiitmmmmii^ 



I. One could alternatively always add this pair, and rely on the simplification procedure to 
discover that -*' is logically /«/m when the other conditions are complete. 



- 137 - 

polyvalue assigned to B by ^ after simplification is {«0,-ir jAT 2 >. 

<i05,(r / Ar 2 >v(-i7* / A-i]r2)>. <2ioj^\-»r 2 >}. 

This example shows the mechanics of manipulating polyvalues, to perform 
transactions, even after the occurrence of improbable failures. From this example, it is hard 
to see what has been gained, as one cannot determine from inspection what the values in the 
data base are, or what transactions have been completed. 

The answer is that in many cases, a polytraniaction will produce simple output 
values. This is true of many query transactions, which attempt to determine whether or not 
the value of some item falls in a certain range. In many case* a query about an item can be 
answered without knowing the exact value of that item. A polyvalue can provide all of the 
information necessary to answer common queries. Consider, for example the test made by T 2 
on B. The decision made by this test is the same when applied to both components of the 
polyvalue for B. 

Another area where polyvalues are useful is that of transactions that have real world 
effects/such as authorizing transfers of money, or allocating a real world resource, like a seat 
on an airplane. For such transactions, it is frequently more important to know what the real 
world effect is than to know what the eventual values in the data base are. If such a 
transaction is run on an input set containing polyvalues, then the real world effect can be 
accurately determined when all alternatives produce the same effect In many applications, 
important real world effects can be determined without knowing the exact values in the 
database. 



■ - 138 - 
Consider, for exampfc, a transaction which to to withdraw funds from a savings 
account for which tht current balance is repretemed by a ootyvaliie. The irnpomnt effect 
that the transaction must decide quickly to whether or not the customer to to receive the cash 
from the withdrawal Computing exactly the new balance hi the account need not occur 
rapidly. The transfer of funds depends only loosely on the balance in the account in that it 
need only be determined that that balance to, under aH possible outcomes of pending 
transactions, greater than the amount withdrawn. Thus in most cases the withdrawal can be 
quickly authorized. 

5 J Recovery of Pending Transactions 

The mechanism described above installs poJyvalues for the results of a transaction T 
delayed in the watt phase by a temporary failure. When that failure to recovered, the wait 
phase of T can be completed, determining whether T to to be completed or aborted. Thus 
the value of the transaction identifier for T appearing to) conditions in the pairs of 
potyvalues can then bedeiermmed. - 

A site learning of the completion or abortion of a transaction T can reduce its 
poly values by re-evatuating any condition that depends on the outcome of T, substituting 
either fro* «" ■ MSL *•" T depending on whether T was completed or aborted. This 
substitution simplifies conditions that involved T, and upon simplification, some of these 

/ ■ ■ ■ * -:■■■■-■■. -,"-, !■• «* .-'?-..».> •■■ ;■■-- :.-■»-:.■ -■.-.■-■■■■ ■ ■ 

' , ';. ■ ■„:.■'■■ K" -, ? '-■ * : > "'"'? -■ J- : ; ■ '■■'■ 

conditions may become logically fids*. Thus knowledge of the completion or abortion of 
pending transactions can be used to reduce the number of possible values which a polyvalue 
represents. Eventually, if the outcome of aH pending transactions to known, each polyvalue 
will have only one pair with a condition that to not logically fids*, and thus can be reduced 



-139- 
to a simple value. Some mechanism must be provided, however, to propagate the knowledge 
of the outcome of a transaction T to site* holding potyvakMi with conditions involving T. 

Such a mechanism must insure that all sites that hold a polyvalue with a condition 
dependent on a transaction T will eventually learn of the outcome of 7". We also desire that 
knowledge of T be deleted when it is no longer necessary (i.e. when no condition involves T). 
The record of the completion or abortion of a pending transaction is similar to a commit 
record [Reed78] for that transaction. Unlike a commit record, however, knowledge of the 
outcome of a transaction may still be needed even after all of the output values of the 
transaction have been installed. Any polyvalue could potentially refer to any pending 
transaction. 

One could have each site maintain a table of outcomes of pending transactions, and 
use a system-wide garbage collection strategy to delete entries that are no longer relevant 
While this scheme would work, it would be inefficient in the case that dependence on the 
outcome of pending transactions does not in general spread very far. Most sites do not need 
to know the outcome of most pending transactions. 

Another possible mechanism is to give a site that creates a polyvalue for a pending 
transaction the responsibility of maintaining a record of the outcome of that transaction until 
such a record is no longer necessary. When a site wishes to reduce a polyvalue, it must ask 
all of the sites that are responsible for maintaining a record of the outcome of the 
transactions appearing in that polyvalue of those outcomes. To do so, information must be 
passed along with the polyvalue to determine the relevant sites to ask. This scheme is 
similar to that used with possibilities by Reed [ReedTSl 



There are tmtamm fsobteros with using thi* «h«i»e for keeping track of pending 
transactions. «2n*eaaaiiftMdiejn**a«Battte^^ 

outcome #f * pending mMuUm is me longer needed, and some $mm fif garbage collection 
may be necessary. Jl second problem i$ that the meuages lent to inquire about the outcome 
of a transaction may puree a burden on th* eomrnunkatkm network, as the inquiring 
menage may be sent many times 4en« ion- each attempted seam to toe potyvalue) before the 
outcome of the transaction M determined. The scheme described below overcomes these 
problems by distributing the responsibility tor maintaining the outcome of a pending 
transaction among ibe sites that ha vt poly values dependent on that outcome. 

Each site maintains m table, referred to here at tfhe potyvalue table, listing the items 
that it holds that currently have potyvakies. This tabte i* uied to locate all of the poly values 
that can be reduced when *he site receives a message indicating the outcome of some 
pending transaction. A second table maintained at each site, known as the transaction table, 
keeps track of fhe spread «T knowledge of pending transactions. Each entry of the 
transaction table contains a transaction identifier, its outcome, (completed, aborted, or 
pending), and a &st of sites to Which ihis she *ws sent information dependent on the outcome 
of that transaction. 

To maintain its transaction table, a site must make an entry for each transaction 
identifier that appears to a condition of a polyvalue at the time that that potyvalue Is 
installed. 1 When a site sends a message containing a polyvalue to some other site, it must 



1. No action is required if/the site already has a table entry for that transaction. 



-141- 
record the name of the site to which the polyvalue wm sent m the transaction ubfc entry for 
each transaction identifier that appears in a condition in that polyvalue. 

The information in the transaction tables in the various sites is used to control the 
distribution of knowledge of transaction outcomes. Each site that receives a commit or an 
abort message for a transaction that it previously knew as pending can update its table entry 
for that transaction, and reduce any polyvalues that, depended on that transaction. A site is 
responsible for informing all of the sites that are listed in its transaction table entry for the 
transaction of the outcome. This list was constructed to include al) of the sites that were 
given information dependent on the outcome of the transaction, and therefore may hold 
polyvalues dependent on that outcome. Once all of these sites have been informed, the table 
entry for the transaction can be deleted. 

With this scheme, knowledge of a pending transaction propagates only to those sites 
which have received polyvalues dependent on the outcome of that transaction. If a great 
deal of computation has been based on the witputs of a pending transaction, then informing 
ail of the appropriate sites of the outcome of that transaction nay require many message 
exchanges. 1 If the outputs of a pending transaction are not used, however, only the sites that 
hold those outputs need be informed of the outcome of the transaction. 

Figure 5.2 shows how this scheme works in the example described above. Let Tj 
and Tg be the two transactions described earlier on items A, B, arid C. Assume that these 
items are held by sites A, B, and C respectively. The figure shows the values of these items 



I. In fact, if the polyvalues depending on a pending transaction are used frequently, a site 
may have to be informed Of the outcome of that transacrtcm severat ! ttmes. It is possible for a 
site to receive a polyvalue dependent on the outcome of a transfctton^ajter that site had been 
informed of the outcome of that transaction and had fo rgotte n that outcome. A site does not 
need to remember transaction outcomes indefinitely. 



-142- 
and the tables of pending transactions in the sites rmtotairrtrig these isems at several stages: 
initially; after T| is su s p en d e d ; after T 2 if suspended; ifter T t to e t e mua lty completed ; and 
after T2 is eventually aborted. 

5.4 Use of Poly values in the Hierarchic*) Locking Scheme 

The discussion of poiyvalues thus far his been tt a relatively high level, so as to be 
applicable to any distributed system tn which locking without unbounded delay is needed. 
The polyvalue mechanism described above could easily be incorporated into most of the 
distributed update algorithms that appear in the literature "i shall 'now 'consider how to 
apply these ideas specifically to the distributed tecking scheme described in the previous 
chapter. 

Recall that in the locking scheme of the previous chapter, any process producing 
outputs to a transaction depending on inpuu ootairted from arfcther process whkh U not one 
of its ancestors in the hierarchy i» tent a lock rtqoe« message. The lock request message 
causes the process to refuse to receive any new messages pertaining to other transactions until 
the transaction issuing: the *** u cdmptetedv The processes Involved in the transaction 
exchange messages until each locked process has wfflcktlt infbfimtton to produce its outpuu 
and release its lock. In order to apply the concept of pot/values, this locking strategy must 
be modified so that each locked process goes through two phases, a computing phase in 
which the process could abandon its lock and cause the transaction to be aborted, and a wait 
phase in which the lock cannot be abandoned, but the output vetoes are known. 

I will first consider the case of * predictable transaction, where the set of processes 
making updates is independent of the data values seen by the transaction. This assumption 
simplifies the task of deciding when a transaction can be completed, as each process mating 





- 143 - 




Figure 5.2^ 


Recovery of Pending Transactions 




Initial State 


A 


B C 


100 


100 100 


Transaction Tables: 




(empty) 


(empty) ; (empty) 




After Tj is Suspended 


A 


B \,a..j;-y-,...P-...i- 


{<0 t 7*7>, <ioo,-ir^>} 


[<m,Ti>, <m->T i >] ioo 


Transaction Tables: 


.... .......... ..,.,.....■ ■ -' - ;- ' ■ ■■ 


r^ding.fl 


7 ; ,p«nding,{} , (empty) 




■ ^"■•\':F' , - ■■■'■ 

After T 2 is Suspended 


A 


... ..g , . *,■ '-.■■!-: (• 


{<o,7>, <ux),-Ti>} 


{<0,->TjAT2>, <200J"^A-»r2>, {<I00,-»r2>, <200J*2>} 


Transaction Tables: 


■ . - ■ : • • i - - •' ' •■;■:'' "'... 


r/.pending.ft 


r />P ending,{C} 7 , 2 4>ending,0 
.-■•-' r2penfMeg,ft - 



-H4- 
After T 2 has been coinpteted, and A and ft have been notified. 



A 


B 


C 





\<toG i T2>> <2oo,-»r2»} 


{<10Q,-*7"2>, <200,r2 > J 


Trstxtseustion TaUw: 






(empty) 


r ; ,dene,{C} 


r 2 4Mnding,{} 




After C has been notified of Tj 
And T^ has been aborted. 


; -. : ; -*^** i ." v ■' ' ■■'. "■'■■' :; -'■ 'K- - ; ■ 


A 


B , 


C 





200 


100 


Transaction T*bl««: 






(empty) 


fanpty) 


(empty 



updates know when it am complete those updates, and the set of procerat unking updates 
is known in advance. 

For each transaction, one process serves the function of transaction coordinator. 
The transaction coordinator has the rwppnsibHity for de te rmining when aft of the processes 
involved in the transaction have reached the wait phase. To begin the transaction, messages 
must be sent to each of the proossm mvoteed in the transact*** in a single atomic broadcast 
The protocol of Chapter 4 must be sttghtly modified to semi Aw^pracoss ^nt i» to pertbrm 
an update a lock request message. Recall that die protocol of Chapter 4 sends lock requests 
only to those managers that cannot eethptee the transaction in one process step. The extra 
locking is needed in implementing potyvaioes because we wish to be able to abort the 



- 145 - 

transaction if the completion of the transaction is delayed, and thus cannot allow any 
manager participating in the transaction to complete its portion of the transaction before the 
decision to complete is made. 

£acn process performing an update thus receives a lock request, along with any other 
instructions for completing the transaction. When a process h« enough information to 
perform its update, it sends a "ready" message to the coordinator. (For any process whose 
update can be made without inputs from otner processes, this happens immediately). Before 
sending the "ready" a process can decide to abandon Us lock aj^ajiy point and cause the 
transaction to be aborted. After sending |be "readj" message, a pieces* enters its wait phase 
andcann^^andcin its lock. When all of the processes that were sent lock requests have 
answered "ready", the coordinator decides to complete the transaction and sends "complete" 
messages to the back door ports of the processes which received locks. Upon receipt of the 
"complete" message, a process completes its update and elnables reception of new requests. If 
too much time elapses before the coordinator receives *readf" messages from all locked 
processes, the coordinator can abort the update by sending "abort" messages to all. Tlie 
"ready", ^complete , and "abort" messages mu« all be Identified with a unique identifier for 
the transaction (probably assigned by the transacUon process that initiated the transaction), 
so that delayed messages do not cause confusion. 

Each process in this protocol goes through two phase*, a lock phase before sending 
the ready message, and a wait phase after sending that message. After having sent a "ready" 
message, a process knows the new values that some Items in its local state will take on as a 
result of completing the update. The process can, instead of waiting for a "complete" or 
"abort" rnessage, decide to install poh/values for these torn. Each data manager process acts 
like a site in the polyvalue scheme described in the first part of this chapter. Messages sent 



-1*6- 
from one process to another containing data items or results computed from data items can 
contain polyvalues as weft 



■ai. 



Two problems must be overcome in extending this scheme to arbitrary transactions, 
first, the coordinator must be able to Know when Hie transaction can be completed, as the set 
of processes making updates is not known in advance. Second, each proems that participates 
in the transaction must be abb) to determine when it has raceiyed all of the messages tfiat it 
wil] receive as a -part of the transaction, so that it kiwws when to enter Uw watt phase. 

The fast of these problems was 'discussed 'tfcatapser* : ^and the completion weight 
mechanism was mtrodtroed lor to toMtion. m transaction coordinator can alio act ai the 
transaction monitor, which can determine when aft oBhe c&$mMlt&i't0ltaM'^ 
oeen oompietea. 

We can modify the completion weight scheme to allow uncertain transactions - be 
performed with a two-phase protocol. Each proem stop of a$ un^njato tpnjsajction,. whteh 
prepares a set of output values to be installed must return ii soine^O|mo|^^. | we,jght |8> the 
coordinator whether oj not it also stnds messages to other preassos. The coordinator thus 
receives messages containing completion weight from j»ch process that ..has ^updattejj [.#em* to 
be installed. When tine completion weight sent to the cxaonlinator reaches ow?, the 
coordinator sends out Jock-release messages as before These lock-release messages are 
distributed as described m Chapter f. " r 

In this protocol, each manager can at any point decide to abandon its lock and 
continue processing other transactions. To do so, a manager installs any updates that the 
transaction has made as polyvalues, and simply ignores any further messages about that 
transaction (except for the lock release or abort messages from the transaction coordinator). 



- 147 - 
This action may or may not cause a transaction to abort, depending on whether or not that 
transaction requires further participation by the manager which has abandoned it. If the 
manager abandoning the transaction is not rieeded to complete the transaction, then 
eventually, the completion weight returned to the coordinator w$ sum |o 1, assuming no 
other, manager decjdes to abort If, however,, the , manager deciding to abandon the 
transaction must perform additional $n&#^m.^qqitfi^lbi transaction (either by 
supplying more iqpntvor making updates, the trar^acton ^ because the 

portion of the transaction dependent ;ii pn. trM.afa^cniBg majiager can not be completed. 
Eventually, ^he, coordinator will decide to abortjhe traniaction. 

This scheme allows the poly value mechanism to be applied to the execution of 
uncertain transactions. In this scheme, each update made by the transaction goes through 
two phases, a lock phase before it is computed, and a wait phase after it has been computed, 
and the manager holding the updated item ha* replied .t%Jhe coordinator. 

Another point that should be noted about the use of poly values in the locking 
scheme of Chapter 4 is that the protocols that allow abortable locking described above may 
require that more lock requests be sent than the simple protocols of Chapter 4. Note, 
however, that any transaction that does not require locking with the simple protocols still 
does not require locking, we are only increasing the nuntber of locks sent for transactions 
that already require locking. 



- 148- 
&5 Rc*tricUQg the Sprad of PottyTatues 

The potyvatue mechanism ft expensive in mat oorrvahm coruume a great deal more 
space than do simple values, and a potytranucttori m»y require 'l b gfeat deal more 
computation. The strapfe analyst* of the potyviki* Scheme and a tfmutattori of the protocol 
reported In an appendix to tfc&V thetis demonstrate that Hi ftcfW'exp^aia nunfoef of 
poh/vataes in a dittribu^ trrfbtmUon system n quite «nt)L "InoiM Wher control be 
necessary, any site can prevem tt»« propagation tfpolyvttu^^ as 

results of some pending transaction, and tnsfead waiting until W^i^iM outcome of the 
transaction, or by refusing to pinlorm some niitiketiW^wnli ptyiMFInffoW and Instead 
waiting until # can reduof the oolyvahjef. These decisions save rjnwrce| at the expense of 
possibly delaying important transaction* th;u couki have been p er form e d promptly on 
polyvatues. j 

In a system with real time response requirernaiu, U o mx unreaioniWe to expect that 
the set of transactions that must be performed in order to product needed results at the 
proper time will be known. It is precisely these tt-an*|ctfoni |h^ as 

poJytransactions, so that if possible, the needed reuilu an be obtained despite uncertainty in 
the database values dye to the presence .of jei^^j^nj^^nf s |i^ s^olyyahm!^ 

Consider a system controlling some rranufscturtng operation in which several 
computers are used to control the manufacturing and art^^ly1^% i "1lkii«(f''i^r ; the 
components that they monitor and control. Several different. Mods of transactions act on the 
data base. There are data entry transactions that are run periodically to enter data about 
the operation being controlled into the database. There are abo monitoring transactions 
which are run periodically to determine whether or not the database values indicate any 
potentially dangerous conditions requiring immediate corrective action. The monitoring 



-149- 
transactions are structured so that many examine only values (beat to some site Iti order to 
insure that a communication failure cannot interfere With monftoring. 

In addition to these two kinds of transactions, there are control transactions that 
direct the completion of specific manufacturing tasks. There are also transactions that 
implement adnMstrative decisions to change the n wiufa cturlng process by modifying items 
representing parameter* to the control and morrttortng transactions, and transactions that 
allow the state of the manufacturing process *to oe examhted. The monftoring transactions 
need to b« performed in real time in order to prevent faihim m the physical components of 
the manufacturing process or "bugs? in the conut^ tranta<««U from creating a hatardous 
situation. These monitoring transactions examine the values produced by the data entry 
transactions and the parameters of the process to detect prob lem s . Any normal set of 
parameters and data inputs will not trigger corrective action. 



In order to tenure that the monitoring transactions function s» real time, poly vahses 
shouW be used for any data items that mightrfae readby ts^iao nito i ^ Iransactiortt. The 
control effects of the m oni t or i ng transactions should be independent of the enact value of the 
data iterro describtagthe process, as long as those dm litems reflect normal operation, 3?he 
transactions which direct specific manufacturing tasks and the transactions Impl e men ting 
administrative decisions may involve updates to data items at several steps, end thus may 
require locking. The locking performed for such transactions should allow the creation of 
poly values for their outputs if some failure prevents the locks from being quickly released. 

Transactions representing administrative control of the manufacturing process or 
control of specific functions may be deemed less important, and may not be executed as 
polytransactions if necessary. Any process holding items accessed by the monitoring 



- BO- 

transactions, however, must be prepared to imuM po»yv»^ye» for those it«ms to inuire that 
the monitoring transaction* are not delayed. 

5J Summary 

This chapter hat boen devoted to a dnomion of the "distributed Jrtomk update" 
problem. It was shown tha* it it i nyn m ible, gfcvtn Ihe fslMe le maiMici of the procew model, 
to construct a protocol which p m for m t -» di st ribut e d upc^ smmJraWy white not doming 
access to ttw updated terns imiefmtteiy at any fuiKtimiioe; awe Several strategies awe 
discussed to avoid tha dittrttoted atomic updaoe prohhm in a uamacuoe synchronization 
scheme. ■■ 

The remainder of the chapter presented a concept refered to as a polyvahie, which 
may provide a practical solution to this problem in many cases. Potyvaluea allow an update 
to be performed t:ondKiunah>, such that both the apdated and nortfopdsted values are 
presented to subsequent taronctton*. =in -an? Infot maltwi i ay uem where the most Important 
effects of transactions depend only loosely en the exact values stored m the dat* base, the 
poly value scheme allows these important effects to be determmed o^Mckh/, even when the 
exact values of items in the data base are uncertain due to uinsaiHona that have been 
started but not yet completed. 

This chapter presented some simple examples of the mechanics of manipulating 
potyvalues and discussed a possible application of polyvahies in a process control system. 



vv^-^i;.*-* ; * '>' x %'^zm%it&t;f.: 



-151- 
Chapter6 
Application of the Teahniquew to tko Design of a Distributed 
Information System 



The past four chapters of this thesis have presented various aspects of an overall 
approach to the problem of robust synchronliatton In i distributed trrtermation system. In 
this chapter, I present an example of a distributed information system and show how the 
techniques that I have developed can be applied to ^derive a avtKhrosuzauon scheme that 
satisfies the goals set forth in Chapter 1. This solution is compared with those using other 
distributed synchronization schemes. 

■■' 6.1 Ttalhvblein : ; ' 

The chosen example is an inventory control system for a chain of supermarkets. The 
problem is adapted from an example given i*^[B«ios«w77l Jbudftta bast U used to keep 
track of the quantities of varioui producU (cans of beans, paper napkins, etcO on hand, on 
order t or in transit at each individual market and at the warehouses that supply the markets. 
The supply chain of the supermarket* is hi«rarchk*l,*^h groups c^ roarket* suppUed by 
local distributers, groups of local distributers supplied by regional distributers, and so forth. 
The following sections describe the data and the transactions to be performed. 



- 152 - 
6.1.1 The Data Items 

Fa? each location (warehouse or supermarket) the data base ce«n»J» a set of data 
items describing each product These are *>. ^-^ >' --i;^- ■••■•?•.••'.<>?■ t 

Quanttty on Hand (QpH) -- The quantity of that product stored at 
that location. 

Desired Quantity on Hand (DQOH) - The goal of how much of 
the product to f try to ^ aTdib location m satisfy demand 

supermarkets). 

Re-order Quantity Threshold (RQJ) - A minimum quantity of the 
. prodoet w keep on hand, Wne» <^H lalfcvtelaw Rfi^F, *h twder < ■ 
is submitted to brine QOH up to POOH. 

Quantity on Order (QQO) - Th« amount j^lf fl|#Wff %|4H r 
been ordered ffdtt thf distributer for this kxaUon, hut has not yet 
been delivered. 

Quantity in Shipping «3>S) - The »roount of the product that has 
been shipped from the distributer for this location, but has not yet 
been delivered. 

The data items pertaining to each of the products ire ind c p e hde n fly examined and updated 
(i.e. there is no single transaction that aceette* taput Kem* pertfcmmt; *o two or more 
products), so I mti consider only the itemi pef earning to a single produet in facfc a typical 
supermarket may stock a tM of^G^dlfflMpredu^ 

items exist fc* each of these product!. : '--' ° ■■•■■>-^ . <-■;-■■■■■ 

The five Items are maintained for each location, market or warehouse. To 
distinguish between items describing different locations that are used by the same 
transaction, I will use subscripts, such as QOHq to designate the level of the distribution 
hierarchy to which an item pertains. Level designates the local markets, while increasing 



-153- 
subicripa designate more global distributers. This is sufficient to distinguish the items 
because each transaction accesses items peftaitttng to it most Hi* locations! a location and its 
supplier. -•■■•■^: ■.>? w 

6.1.2 The Transactions 

The transactions in this system serve both to reflect real world effects, such as the 
unloading of a truck, to the data base, and to determine when some real world action should 
be performed to keep supplies of all products available. For each product there are four 
different kinds of transactions: Pohff of Sste. Re^Ordw. Sahwiiit. awd llecelvlne. 

Point of sale transactions (P transactions) update the quantity on hand to reflect a 
customer purchase. P transactions take place only on the, QOH for the locations 
corresponding to supermarkets, and not on those for ^distributors. For a typical 
supermarket, there are about 25,000 P transaaions per day. 

Re-order transactions (O transactions) generate hew orders for merchandise which 
has been depleted. An O transaction examines ^the QOH, QOO, RQT. and DQOH for 
some location and produces a new value for QPQ. Fy i^Jb jararthll. approximately 2000 O 
transactions are performed per day to determine which products must be ordered. 

Shipping transactions (S transactions) reflect action by a distributer to fill an order. 
A shipping transaction examines the QOH of the <UstrJbUMr and the OjS and QOO of one 
of its customers in order to decide how much «t the product to ship to that customer. Tht S 
transaction updates the OjDH of the distributor and the <^pO of the customer to reflect the 
shipping decision. S transactions are performed at thtfratirbf about 19 per day per location. 



-154- 
Receiying inmirrtoM <R tra n s actio ns ) reoerd it* arrival of shipped goods at a 
location. Each R iammtiim add* the fMfNtpnt Jiae|tire£jte4(y}|#, aiid? whtracl i Jt 6m^S 
and QpO. About 15 R transactions mke place for each rite each day. 

These transactions are summarized in Table 6.1. In the papef whicfe is the source of 
this example, the authors were unconcerned with the detail* of how each transaction derives 
its outputs from its input values. I have therefore made some "educated guesses" in deriving 
a more complete description of the transactions. 

Note in particular that the receiving trwiactioni are presumed to take as a 
parameter the amount of the product received, and to use that amount to update the items 
QOO, Q}&, and QOH. An R transaction always has independent c ompon en ts, because the 
new value of each of the items updated depends only on its previous value and on the 
parameter Q. Another possible interpretation would be to use the value OjS to determine 
the amount received, thus making the new values of QpO and 0J0H depend on <i$S. I 

. — ^M^ W . - Hlll I III ■■ ■ ■I I II M B— ——<«— — I I II 1 Ill 11 ) 11 I I M »*>«iMM«MI«M^|ipi«MI^^ 1 11111111 fl^ff— —M M IIM1I N ■■■■■ — 

TavbloS^ 
Tiravaoavotions for Ittvotttorjr Control 

Transaction Description Frequency 

P QpH^-GpH^OJ 25,000 

O QpOj :- CMQOH,. QpO,, DQGHj, RQJ,> 2j000 

OPO, :- QpO^Q) 

OJSjr-OJS^Q) 15 



-155- 
believe that my interpretation more closely resemble what would happen in a real inventory 
control system, as the parameter Q, represents the amount actually received, and, may not 
correspond to QJS for variety of reasons. 

Having a complete description of the data base and the transactions to be performed, 
we can now proceed to analyze the syuero using tr^tc^ devetoped in Chapters 4 and 5. 

6.2 Analysis of the Transaction* 

In this section, I present transaction graphs for the transactions to be performed by 
the inventory control system. These are analyzed tovexfttor* the ^way* in which the 
transactions interact with cash other. This analysis is used to d e ft ti mi ne the protocols needed 
to perform the transactions using several dinVer* oi^is*t*M|s of the data base (choices of 
which items are held at each site). The choice of the sy nch s oBitaM pn network, for each of 
these organisations is discussed. Finally, I discuss the use of poJyvalues in this distributed 
information system. 

6.2.1 Transaction Graphs for this Application 

The transaction graphs for typical transactions from these four classes are shown in 
Figure 6.1. The P transactions are the simplest, as each P transaction accesses and updates a 
single data item. P transactions wilt have independent components in any organization of 
the data base. 

The R transactions are somewhat more complex, -as they access and update three 
different items. As noted in the previous section, however, the new value of each of these 
items depends only on its previous value Therefore the transaction graph of an R 
transaction does not contain arcs interconnecting the three updated items. 



" ISO * 

The O transaction* update a angle dttiii (QP© ftrieme *&***% but do » 
based on several input* ^ibeam a* Rfure 6t tl»* tttnttctJott f«H* tor a O transaction 
has arcs connecting QpH, RQT, DQOH, and QCWJ^^Di©: 

The Stianswtfom are themes? 2aW§ tnM^McikW updal»*two Hems (QJS 

for some location and <*OH tor its suppHer). «Mn«^iit'fMMH» fahiel-hf 'thiree dtftorent 
items. The transaction graphs tor these transactions Ant corttam cycks of arcs, connecting 
QJS, QpH, and QpO to both $p and Qj»fc These eych* Indicate that 8 transactions are 
likely to require locking in any organization of tht data hast. 

The four kind* of transactions taking ptoca at vaehuat levels of the hierarchical 
organization of th* hrattens mtemt. This interaction * wnrh e lirt ta/cAgnar fc2, which 
shews a joint transaction graph tor the traftsMttans taking ptaoe •« three hweh) of the 
hierarchy. The Joktf wswstrtlWfi graph feconssfocsai from the mdtvktaal transaction graphs 
hi the same way thaw a jeiM iMieifpgf^^ 

To distinguish between the transactions taking place at different teveto of rfce dtstrihwtton 
hierarchy, each transaction identifier is given a subscript to indicate the level that that 
transaction pertains to. 

The interactions among the transactions suggest that the processing to be performed 
exhibits a high degree of locality of reference. U the hems are grouped so that a« of the 
items pertaining to a single location are maintained by a single manager, the only 
transactions that require the participation of more than one manager are the S transactions. 
These transactor* mpie se m OW of the to tal voh mw of i i snmirt oiis u> be ran hhoogh they 
probaWy represent a higher pr ey o f ts en of die prootsstng , because they ere more complicated 
than the more frequent tttftstetkmt). 



-157- 
Fignrs 0.1 
Transaction Graphs f o* '. 



6«la P Transactions 



6 Jib B Transactions 




8.1o O Transactions 






644 8 Transactions 





-IS8- 



**e» ^^ ^s?BS|BJ8jr^ .w*^n|f^Ht^^p^|^^^^^^^P^W^.^^^W^^^^P|m; ^^H"* 




■ i#n.w p my i l n i m ii— li i nn uft i 



*2.t Offft^ataf the Dau 



In this section, I consider several different ways in,which the data items coukt be 
assigned to data managers. Each of these organizations fof the data Vase is discussed to 
show how the four types of transactions would be performed hi such an organization. The 
actual choice of implementation would be based on the desired level of availability for the 



■ lo9 ■ 
data items as well as the cost of performing the transactions and the proceuing and storage 
capacities of the sites holding die date manager prooascs. 

A simple organization for this data base would be to assign all of the items 
pertaining to one site to one data manager process which executes at that site A Joint 
activity graph of the four transactions as performed in such an organization is depicted in 
figure 63. The graph shews that in this organisation, the awry type of transactions requiring 
communication between data managers ate the^tramasHons. All €#*he other «raneacsions 
can be performed by one of die manager* aione» because all of the items involved in any of 
the other transactions ate under control of a swgk data manager. 



tfuiy . i juj i j tm ii g i inuj", '■ ' ' f ' |[W"i >g j P '» S i'lj Ml "H —H— *■■»■—■ —fi* 



Figure 6.3 
An Activity Graph f or a Simple pata Base Organization 




Assignment of 

M 

QPH 

Qpo 
RQTo 

DQPH 

Qjs 



to 
Ml 



RQT, 

UQOH t 

QJS, 



arsl 



M 2 

W H 2 



RQT 2 

DQOH 2 

Q|S 2 



In this nrgewinmn. mm txmmmm wotrtd wguife wo inter-manager 
synchroniiation at a& and the rare S tsinmiium wottw wyttw kaking ^witb any 
synchronisation network of the date manager processes (because of the cycle involved in each 

S traruactkmX Because none of the item* in this organization are replicated, thU 

.. . :■ •- ■ ■ ■-:-.:■■ :-. :•./;..;- <•■.■- ■•-'■.. -^ "■ ' ; WM" r -A* -^ «« -•'.-.' .'•■ o -V :■.'"•*;■.■»*'.'■"--., 
organization requires the rnmsmum possible storage space. 



While IIm S tianaacKinaa. sve 
S tramaction . » uetiesirabfe ' Lacking nukes the tm**m atyoNed to* peil bi wtwy >i»4 
transaction vulrterabkt t» femMes ettfine; tbe eaecataer of the' teMuecttoit Matty strategies 
can be used to reduce thi* v as nars h tury men ■rmptsaei taed t such as aainy poryvehseHas 
witt be discussed in a later sectienX er running the S transactions at a time when there is 
little other activity, such a* after the store* have closed. We can avoid the necessity of 
locking far the& tr i nsartiani by re o rganizing th e date base. 



The arcs fi?©» M^ to M and from M 2 ti> Af j result iron* the necessity to update the 



QJS items at one manager based on the Qpft lams at the Wgber level manager, we can 
avoid this de pe ndenc y, by moving die item QJS^ from manager M t to manager Af j«,j. A 
Joint activity graph for the. Betuking organiaten is, shown in Figure 6.4. 

In this organization of the data, again alt trajwactiero except for the S transactions 
are again comptaety Jocal » om of Jhe data managers. The S oanaacttons- are- perforro ed by 
two of the manage*! and require communication. Unlike the previous organization, 

■/■v. «• :'•*» 

however, this communication is one-way, such that if a hierarchy of managers is chosen such 
that M M is ahvay* a de sce nda nt of Af^, then the S trarisacrton* can be perfo r m e d without 
locking. 



-161- 
Figure 6.4 
An Activity Graph for ft more efficient Organisation of the 

Data ■ -:-: -..:. ; >- .. 




Assignment of Items to Managers: 
Mq: Mj M a 



QPH 


QOH, 


QpH 2 


QOO 


QpO, 


Q00 2 


RQT 


...Wl 


RQT 2 


DQPH 


DQOH, 


DQOH 2 




QJS® 


QJS, 



Figure 6.4 shows the joint activity graph for 3 locations in the hierarchy of 
distributers and supermarkets. In a reaf application, there would be several supermarkets for 
each local distributer, and several local distributers. This makes the Joint activity graph 
somewhat more complicated, as shown by Figure 63. 

Figure 6.5 shows the joint activity graph for this organization of the data, for a 
system in which there are four supermarkets (Thus four Mq managers) being supplied by 
two local distributers. Each manager and each transaction ii labeled with two subscripts, the 
first indicating the level in the distribution hierarchy and the second indicating the location 
at that level to which the manager or transaction pertain. The graph is hierarchical, with 
an arc from each manager to its parent. Notice, however, trajt using the ajjpirent hierarchy 
in Figure 6.5 as the synchronization network would not allow the transactions to be 



-162- 
performed without locking,. Each manager (E*e#Bt for the M# managers) must obtain 
information from its chikireri in the hierarchy to perform the & tra n sactio n s. If the arcs in 
Figure && were reversed, then the transacUcm could be pe rfo rme d without locking fey using 
the hierarchy in the joint activity graph as a synchronization network. I will refer to an 
activity graph of the form of Figure 6JJ as an inverted hiceerdiv to distinguish it from a 
hierarchical graph in which there is an arc running from each manager to each of Its 
children. 



Figure 6.5 
A More Complete Activity Oravph 




-163- 
Given this organization of the data base, we must chose a synchronization network 
that allows the transactions to be performed with the firojocojs of Chapter i. While thdfour 
classes of transactions described here do not involve any transactions that; access a large 
number of items, presumably in a real inventory control system there would be other 
transactions much less frequent than those in the four classes which perform functions such 
as changing the parameters DQOH and RQT, or allowing a user to obtain a snapshot of 
the quantities of some item in the various locations. In order to provide the ability to 
synchronize any possible transaction on the data, the organization of data managers must be 
hierarchical. 

Any hierarchy of data managers that is consistent wi& the inverted hierarchy 
defined by the arcs in the joint activity graph must be some linear ordering of the nodes. 
The conditions that M2 q be a descendant of all managers, and that some process be an 
ancestor of all managers, and that there can be only one path between any pair of managers 
force a linear ordering. This is not a very desirable organization for synchronizing the 
transactions, because the message sent from some manager M t to M^j in performing an S 
transaction may have to be routed through many other managers that do not otherwise 
participate in that transaction. This makes S transactions expensive and vulnerable to 
failures, however a failure occuring during an S transaction does not unnecessarily delay 
other transactions, because there is no locking. 

Another alternative is to abandon the ability to perform any arbitrary transaction 
and restrict the synchronization mechanism to acting on the four classes described above. If 
we are only interested in performing these fon/ transactions, then the wly communication 
among managers that is needed is that described bjMbe joint activity gEaph of the four 
transaction classes. A non-hierarchical synchronization network could be used to coordinate 



- m - 

the ttranaststiens. T4« mverteti li J er arehy -oTttbe jrtwt ««CH^ty fWffe meets ihe requirements 
ior a ayw^ ruma r HuB vietwoi* owth ao i m Cibsftor >, ft he»e«% one path b e t wee n any two 
p rooe ne* ), ami ftfe**iet «* of $he b u bw w b y uoimwunlijaii e i i poths to carry out tmntactions 
ftwn '4hc $aur ;' 



Thus the data malingers could be tagicetly organised in an inverted hierarchy. AM 
of the transactions except Hie 1 tnmsections would, as before twelve only one data manager. 
To perform an S transaction , a message would be sent to the manager A*, holding the item 
QOOj. This manager would obtain the value of ttris item and tend ft to the manager M^ 
which holds <$©H^ and *|pSf. Tnti manager can then update these items to complete the 
TttMiumMlan with nn'tM^ti— 

This organization of the managers is dearly moM efficiem; in terms of the amount of 
locking and the number of messages sent, hot has sacrificed the ability to coordinate other 
kinds of transactions, ©ne would probably net want to choose an organisation of data 
managers hi which * is not possible to synchronise any arbitrary transaction, as this may 
make it very difficult to i mp le m e n t new kinds of transactions. StinV in a situation in which 
the transactions and the data base are permanently fixed, each as a distributed rocket 
guidance system, choosing the logical organization of the data managers without considering 
the kinds of new transactions that could be desired is not a problem. 

&2.S Replicated Organisation* of the Dsts Base 

t^'.twO'Onjpna«tiotB'-^f the data base described above have a single copy of each 
data item, in this section, I consider orgtn*s*aons tn which some of Uto' data items are 
fopheated. Replication could be desired either -fef greater robustness (less chance Mat a 



- 165- 

transaction will be delayed due to a site being inaecewtble), or in order to eHminate locking 
by making more of the transactions wave independent t o nfrone n ti. 

One could, by replication, make all of the transactions have independent components. 
This effect could be achieved by making Jure that whenever a manager holds a copy of 
some Item I, it also holds copies of att of the items needed by the transactions that update I 
in order to make that update, For each item I held by a ma nager , that manager must also 
hold copies of afr items from which arcs m the joint transaction graph point at I. Thus, In 
effect, each manager hotting a copy of I must HoW copies of all Hems that are linked hi I by 
a chain of arcs in the joint trahsaction graph. The joint transaction . *ph of Figure 6.2, 
indicate that a site holding a copy of the items QOH, or QOOj must abo hold copies of the 
items QOHj, QOOj, RQTj, DQOH^ and QK1 for all j £ i, because of the chain of arcs 
Unking these items to QOHj and QOOj. This presents an awkward problem, as it means 
that in order to make all transactions have independent components, a single site must hold 
copies of all of the items and therefore must participate in all of the transactions. It 
therefore does not seem practical to avoid locking through this approach. 

This particular application appears to hare Httte need tor replication to increase the 
availability of data items. The transactions that are most crttkal to perform quitkly are the 
P transactions and the O transaction j. White we could repttcate the QOH items in cwder to 
increase the availability of these items, there seems to be little point in doing so. Because the 
P transactions are by far the most frequent, replicating the QOH items would add greatly to 
the amount of communication and possibly the amount of computation required. A more 
appropriate approach might be to make the sites which hoW the QOH items highly 
reliable. Another approach that could be used is to use several sites to hold the date items 
pertaining to each supermarket, partitioning the items so that for each product, there is one 



site that holds all of the tarns that potato «e that pvodoct This s^pcoach may aitaw the 
individual sites to be wm9m, simpler, and awra lat Ull* Jlnw a atoglt site m a nag ta g mH 
items fin* a supermarket 



It would also, presumably, tie important that O transactions b< cxvattad promptly, to 
insure that supplies of products remain adequate, to onto *» mete the O tiaraactiom less 
vulnerable to failures, we ceuid replkate the stems attested Of the O transactions. 
Unfortunately, the t ranmtiom at gmch teotion saasm many of the *em» for that location, 
including che QpH item*. Thm, replicating *ems loa»a»tt Usnnuliuoi more reliable 
would make many of the tramacuom more ispenaivs. doe to the an ess stty to update the 
copies of the QOH item* 

Another organization of the data that might be u«ed is to replicate the QJS i and 
QpO| Hems so that M t and M^j each have copies of both tarns. Figure 6.6 shows a Joint 
activity graph for this organisation. In this organisation, M t and Af^/ each have copies of 
the items pertaining to orders sent from sxauon i to location i«L This organization does not 
provide any reliability advantage over the first otgatritation considered in performing the 
four transaction*. Having the <#S sad Qpo item* «pttaaed may, however, allow the 
human manager* hi charge, of (hipping and receiving a* the sito to determine the status of 
orders more easily, even if a feihirt Interrupt* convnuniesthmbetweea kxatioru. 

6.2.4 The Use of Polyvaues 

Another way < o| increasing the avaOabWty of the data tares in the event of a failure 
U to use .the polyvahie mechaniwn dcsciibed kr Ghs^O^ & As wK*d above, the P 
transactions are the most critical White locking in ati of theabove organisations of the data 



-167- 

Figure 8.8 

An Activity Graph for a Redundant D*t* Base Organization 




Assignment of Items to |f aiiagers: 
M : Mj M 2 



QOH 


QOH, 


QOH 2 


Qpo 


<3pof 


Q00 2 


RQJ 


ROJi 


RQ,T 2 


DQOH 


D^OH, 


DQOH 2 


ORq 


Qjs, 


<^S2 




<y$ 


QPl 




Qpo 


QPO 



base is rare, it is possible that a failure during one of the S transactions could delay access to 
the Hems used by those transactions. This could in turn delay other transactions. 

By using the poly value mechanism described in Chapter 5, we can avoid this delay. 
Two factors suggest that the polyvalue mechanism would be effective in eliminating 
unnecessary delay of transactions local to one site by failures Of other Hies. First, many of the 
transactions depend only loosely on the actual data base values. The O transactions, for 
example, make a decision of whether or not to order that depends only loosely on the items 
read. Second, very few of the transactions require locking, thus the probability that a 
transaction requiring locking will be interrupted by a failure is small 



Notlce also that most of the transactions would not propagate poh/vatues that have 
been introduced into the data bace The P transactions tad R transactions d» not propagate 
information among items in the data base, white the other two type* of transaction! may 
propagate a potyveiue to at most one new Mem. This mean* that if a potyvahie is 
introduced, it wwY not cause uncertainty to be propagated through the data base 

Whether or not the potyvahie mechanism should be used for this application 
depends on the actuaf cost Of impl e m e n ting potyvakies (in terms of the extra checking that 
must be performed in the course of performing a transaction to handle the possibility of 
poty value inputs), and the concern for reliable operation. The cost of implementing 
potyvakies is not likely to be high, but the benefiu are Hkeiy tohfe small, as so tittle locking is 
performed in this implementation as to make it unlikely that * potyvakte will ever be 
produced. 

*S Comparison with Other Mechanisms 

Several other mechanisms could be used for performin g synchrontoatton of the 
transactions in this example. This section briefly compares some of the other mechanisms 
that have appeared hi the literature with the solution described above 

fc3U Comparison with SDD-1 

As this example i$ derived from one used for the SDD-1 system for synchronization 
of distributed data bases, it seems natural to begin any comparison with SDD-1. This 
discussion presumes that the reader is basically familiar with the SDD-1 mechanism ami the 
solution to this problem using SDD-1. 



-169- 
Using the analysis and protocols of SDD-1, one o«idiKi«ft that the P transactions and 
the a transactions can be performed by the simple* (pi) protocol. Ths* protocol rehires no 
locking and in fact closely reiarobles the protocol used to perform those transactions in the 
solution described above. The other two transacUop da«es, however, require the p3 protocol 
of SDD-1. This protocol performs locking, by forcing tbf ^ajdppida^^nanager* to perform 
transactions in tjyme-stamp order. Thus Stf^H locks for two of the four tramactioo classes, 
while my mechanism, Jocks for only one, ;fs The;vreaspj^jha|d|Wppjof«^it, transaction classes 
require locking in SDD-1 is because the analysis techr^w* u«0 by SDD-1 <*o not wcognite 
that the R transactions actually have three independent components. While these 
components must be performed atomlcalty with respect to other transactions, there is no flow 
of information among the three components, thus they can occur in any order with respect to 



.v.fjj ■.-.' 



each other. The more fine-grained analysis used in the mechanism of this thesis discovers 

■ . . • .,•-' ,. . iViii .. ■ -■ -.- ■■.i.:„.\i&s*.y* f u 'ssiJ' ■.; : -".r --fU':- "" : ■■-- 
this fact, which allows these transactions to be performed without locking. 

The locking protocol used by SDD-1 is simitar in cost to that used in this thesis, if 
the SDD-1 mechanism is implemented limply and without regard to failures. Both involve 
sending a message to each of the data managers which will be involved in the Jransaction, 
requesting that the data manager set aside soroe time to perfc^m ^.transaction, and then 
performing the transaction with as many adduk^ rrmsagovas are needed to move the 
data. ■ . . ' ,._.... . 

The robust implementation of the SDD-1 protocols [HammerTSj, however, is very 
complicated, and may involve many extra, menage*, ^appears? to be quift* difficult to be 
sure that the transaction* are performed to timettarr^ order w«h^ 
to stop all transaction processing. - * > 



- OB - 
The robust iwyt w mwoiuu ii qf thg S&D-t' f rmo o e» aaxmpu to minimize the 
pr obabi li ty ttwc a fttlfcm wift make data inaccentbfcr dneugft the uk of abatable locking, 
«id a voting strategy to demiwin e when a tr*tmcUonp*m*fHs commit point Using these 
techniques may greedy increase the iwwtiber rf nwssages that must be sent and the processing 
needed to i m pleme n t t rtm a cttom using the bcttng preteeoh. In contrast, using the 
potyvahie mechanism to mcrease the avtifcttBtytf does not greatly 

increase the cost of p w fai nh^ transections if rwfaitera occur. Onh; after a pc4yvah»e has 
dwj creams ueesmtt mecuuntsn? vegm m oe inero eKsenstve. 

In summary, the protocols used in this thesis are likely to be slightly less costly (in 
terms of processing power, and messages sent) to achieve a comparable level of robustness 
than the protocols of SDD-1. This does not mean that my mechanism is always less costly 
than SDD-1, »» the cost of both mechanisms depends strongty on the application. 

fi.12 Comparison with Grey** locking strategics 

Another distributed concurrem^ octroi mechanism U o>K^bed in a set of notes by 
Gray lOrnpU These notes describe* a m ec ha nism in which a set of sites, each of which is 
capable of synchronizing local transactions, on tuiiamiiiksie to perform rmihi-iite 
transactions atomtCaHy. Th% mechanism touW be used for this exampte, by assigning the 
Items in the data base to various sites. 

The protocols used by Gray require locking #henevettwb'or"'nwre sites are involved 
m one transaction. Thus the 8 and R transactions would leeuire locking in this example. 
The locking mechanisms proposed by Gray area mednmism ^ recordhig the locks held by 
each transaction at each site, and a deadlock detection mechanism to escape from settings of 



-171- 
tocks Hi which no transaction can proceed. The cost of setting the lock j needed to perform a 
transaction is similar to the kxktng mechatitsms of SDD-f and this thesti. " : *' 

The problem of deadlock detection, howev eft adds to the cost of Gray's scheme. 
Deadlock detection requires analysis of the sets of locki held by alt trarnactions at all sites, 
and may b« quite costly in a large system. Cray saggests th*f deblock detection can be 
partitioned, so that deadlocks among smaH gwapi of sites can be detected more rapidry and 
with less computation than deadlocks involving a large number of sites. This strategy is 
likely to work reasonably well in this application, as each transaction involves only a small 
number of sites. Deadlock detection still represents an additional cost in operating the 
system, over that of using the protocols of SDD-I and this thesis which use pre-analysis of 
the transactions to avoid deadlock situations. 



6.4 Summary 

This application (the distributed supermarket inventory system) is typical of the 
kinds of distributed information systems which this thesis addresses. The analysis shows 
that the transactions exhibit a strong degree of locality of reference, and that most of the 
transactions can be implemented without locking. The choice of which of the data base 
organizations and synchronization hierarchies to use for this application depends on the 
concern for reliability, and the desire to maintain flexibility to perform transactions other 
than those initially planned. The overhead of synchronization in the organization in which 
the hierarchy parallels the hierarchical organization of the locations is very small, as very 
few transactions require locking, and no extra messages are used for the synchronization of 
transactions which do not require locking. 



-172- 
The potyvaltte mechanism described In chapter Ave outbcund Hi this application 
to mlnimiie the probability thai a ffcitort wlH delay tiwsacttom 

The implementation of this application using the technique* of this thesis was 
compared with two other distributed data base oma»rfency coittrol laechenitms. This 
comparison points out that tha te chwo j ue t of this thesis were likely to be lew costly than the 
other mechanisms in achieving a comparable level of i 



In conclusion, the synchronization mechanism of this thesis can he seen to he efficient 
and robust for this application. At the same time, the mechanism allows a great deal of 
flexibility, letting the system designer trade off the desire for reUaoitet and efficiency against 
the ability to incorporate unpkmwd transactions auih/. 



-173 



Chapter 7 
Conclusions and Areas for FurtW Rosoaroh 



Tt»i» thesis has presented a model of syiv^in^i^tj^ ^ tn^uacttons in a distributed 
information,; system, and Mineral mechanisms for proving ; such synduwization. This 
chapter summarizes the important contributions of the thesis tO; this f4ett» and suggests some 
areas for farther investigation. , 

7.1 Summary of Thesis Work 

The work of this thesis has concentrated in two areas: development of a model of 
computation in a distributed information system, and development of specific mechanisms for 
concurrency control in such a system. The major ideas of the thesis in each of these areas 
are summarized below. 

7.1.1 A Model for Distributed Computing 

The process model of distributed computing presented in Chapter 2 is a framework 
in which computation in a distributed information lystsni <a» be discussed. This model 
specifies that the effects of site faikires or conmunkation rashires are lost or delayed 
messages. The thesis discusses techniques that could be usecN* provide an implementation 
of the concepts in the process model for which the effects of rfatlores ire confined to these 
specifications. 



- 174 - 
I developed taw fettle strategies ^r synchronizing transactions described in the 
process model: locking and sequencing. Sequencing achieves the goal of partial operabllity 
defined to Chapter 1, white lacking may allow a failure of one site to delay a transaction that 
is local to some ether site, in Chapter 4, 1 demonstrated that locking was needed to correctly 
coordinate some groups of transactions. Chapter 5 presented an ar g um e nt for the claim that 
tnany to i pl eti i e hia^ delayed 

by a faihtre at son* «*her site. Ta*en together, tt»e* 

to achieve the goal of partial operabmtT white oorrectfjr synchmn&mg all trartsections, given 
the possible effects of failures as specified to the process model 

7.1.2 A Hierarchical Co ncurr ency Central Mechanism 

Chapters three and four of mis thesis presem a hiei^i^ical medotnism to cocntlinate 
transactions. This mechanism hat several mteresting properties. First, it is quite simple to 
describe, and relatively simple to prove correct Many of me syexhronization mechanisms 
described to the ttteratur* are quite complex, and correctness proofs far these jnechanisms are 
very long and complicated. The simple imptementations of the protocols described in 
Chapters 3 and 4 suggest mat my mechanism would b^i tl s llvelj « in an 

actual distributed information system. 

A second important prope r ty of my scheme H tint it p erfor ms weH when the patterns 
•of accesses to items In me distri b u ted data base show a strong locality of reference. The 
mechanism can be taikxed so that frequent <ranss*Ohms faauire Sttte overhead for 
synchronization. The mechanism can also be designed so t* to avoid locking whenever 
possible. The thesis describes analysis techniques that can be used to assess *he eost of 
performing the most frequent or important transactions. This analysis can be used to choose 
an organization erf" the data and the synchronization network so that these transactions are 



-175 - 

performed efficiently and reliably. The mechanism provides cofrect synchronisation for all 
transactions, even those not anticipated in the design, however unanticipated transactions 
may be much more costly to perform and more likely to be delayed by failures. 

Chapter 5 of the thesis presents a novel solution to the problem of unavoidable 
delays caused by failures during the execution of a transaction ujjwg locking. The poly value 
mechanism in many cases allows a transaction to be run ey*i|4f the values In the data base 
accessed by that transaction can not be determined exactly, due to. a failure. With this 
mechanism, important transactions that must be performed promptly are* in many case*, not 
delayed by the locks set by other transactions. The protocols presented for manipulating 
polyvalues again are most efficient if most of transactions are local to one or to a small 
number of sites. This assumption of locality of Terence appears to be true of many 
applications. 

The model and mechanisms of jthis the»is jhed some Ught on what U a very poorly 
understood area of computer science. They do not by any mean* provide a complete solution 
to the problem* aod in fact suggest, several mteresting research probleins. 

7.2 Areas for Further Researoh 

There are a number of ways in which the work of this thesis could be extended to 
provide a better understanding of synchronization in distributed systems. These include the 
investigation of the applicability of the process model to real physical systems, further 
investigation of applications, better techniques for constructing the synchronization 
hierarchy, and implementation of the protocols. 



-f7B- 

7.2.1 The AppffobiHty of tltt Process Model 

The results of this thesis are based on the semantics of failures in the process model 
In particular, many of the results are based on the notion that there is no single event 
detectable by two processes rtmutatneouily. While I strongly beheve that this is rive of any 
physical system, there may in ftrct exist wsyt of tmplerflentmt; communication in which the 
sender of a message can know for sure whether or hot that miss«^ wa» Irece^ed. If bits is 
the case, one might be able to implement abortabfc locking as described in Chapter 5, 
contrary to the argument! advanced in thst chapter and hi several other papers in the 
literature. 

Another related area for investigation is that of ways of including the effects of 
failures in a model of computation. In the process model, I assumed that a failure could 
delay any message indefinitely. It is possible that some less pessimistic assumption about 
failures would lead to a workable model for a distributed ^formation system. One might, 
for example, assume that no more than N sites faft concurrently. While it would be 
impossible to implement a system so as to conform aj thtt a ssu mp ti o n , if the probability that 
the assumption is violated is sufficiently small, then a distributed information system based 
on the assumption may be aeespftbty rettaWe, end mtftte simpler to implement 

7.2.2 Applications 

This thesis makes extensive use of the assumption that applications of a distributed 
information system wilt exhibit locality of reference in their use of data. This assumption 
appears to be true of some planned applications, however more careful statistics of actual 
applications may be needed to confirm the validity of this assumption. We may in fact 



- m - 

discover that the flexibility of a distributed infbrmation system will encourage different 
organizations of information that do not exhibit the same patterns. 

7.2.3 Analysis of Transactions 

The thesis presented techniques for determining the cost of performing a transaction 
(in terms of the number of messages required) using various logical organizations of the 
data. Guidelines for choosing the synchronization letwofk and data' base organization, 
given a description of the most frequent transactions to be perfor m e d , were given'.'' These 
guidelines are not, however, detailed algorithms that design the synch ronijatipn mechanism. 
Considerable effort and ingenuity may be needed in choosing, an optimal, or near optimal 
synchronization network, and in choosing the assignment of data items to data manager 
processes. These problems are similar to many others that occur in managing resources in a 
computer system, and it would seem likely that good algorithms for designing a distributed 
information system using the hierarchical synchronization mechanism of this thesis coujd be 
derived. 

7.2.4 Implementation of the Protocols 

Finally, the thesis presented only a few simple implementations of the synchronization 
protocols. Improvements on these implementations can no doubt be made One area that 
seems of particular interest is using the hierarchical synchronization protocols in a computer 
tailored to managing data. The hierarchical •ynchroniattion mechanism p r e se n ted here fits 
well with the proposed models of memory for a data base machine. This mechanism may 
lead lo a very effteiem implementation of such a machme. 



-178- 
Another implementation issue that hear* further twvtUSganon is the design of a 
communkatton network that support* aiomfc htrnrirsstiag m Chapter 3, techniques for 
using a broadcast network to Implement the mesiafe forwarder protocol were presented. 
The need for a coordinator site is a weak point in this scheme, as failure of the coordinator 
site stops an atomic broadcasting. It is possible that the funcUqn of the coordinator coutd be 
implemented in each site's network interface, m such a way that a single site failure would 
not stop broadcasting. If the network were designed spectfkaUy to support atomic 
broadcasting, it is likely that broadcasting could be made efficient and higbty reliabw. 

Several mechanisms have been developed to maintain several identical copies of a 
redundant database. As noted ik Chapter 4, this problem appears to be somewhat simpler 
than that of synchronizing transactions in a distributed info r matio n system, because each site 
has a copy of every teem. The techniques that have been developed to manage replicated 
data could be applied to maintaining copies of the process state and message queues of a 
process, in order to obtain a more robust implementation of a process. Application of 
techniques for maintaining duplicate data bases to the Jrnpteroentation of processes would be 
an interesting research problem, and would lead to a more robust implementation of the 
concurrency control mechanism presented in thii thesis. 

7.3 Summary 

This chapter has presented a summary of the resnts of tats thesis, and present e d 
some of the open qun ti oni that this the*U leaves unanswer e d. Many of the com ki s fcrfii of 
this work are not decisive, however I hope that roy work hat shed some smefi degree of light 
on a very murky and poorly understood field. 



■.•#,*' 



- 179 - 
References 



[Akkoyunlu75} Akkoyunlu, E.S; Ekanadhani, K., Httber, R.V., "Some Constraints and 
Tradeoff* in the DeJigri i of htawWk Communications", Pnc. Fifth Symposium 
on Operating System Principles, Hov*rt*«r, 1979. (Operating Systems 
Review, Vol. 9, No. 5). 



[Al*berg763 Alsberg, PA., Belford, G.G., Day, J.D., Gripa, E„ "Muttt-Copy Resiliency 
Techniques," CAC Document • »2, May, !97». - 



[AtkinsonTS] Atkinson, R.A., and Hewitt, CX, "Specification and Proof Techniques for 
, Serialfeer*" Draft, March 20, 197& 



[Bernstein77] Bernstein, P.A., Shipman, D.W, Rothnie, J.B, and Goodman, N., "The 
concurrency control mechanism of SW>* A System for distributed databases 
(The general case)," Computer C^poration of America technical report 
CCA-77-09, December 15, 1977. 



[Cerf7^l Cerf, V.G. and Kahn, R.E., "A protocol lor Picket Network Interconnection," 

IEEE Transactions on Computers, May, 1974. 



[DOIlveira77] d'Oliveira, C.R., "An Analysis of Computer Decentalization," M.I.T. 
Laboratory for Computer Science Technical Memo TM-90 (October, 1977). 



[Dennis75] Dennis J.B„ "First Version of a Data Flow Procedure Language," M.I.T. 
Laboratory for Computer Science Technical Memo, TM-61, May 1975. 



[Dijkstra68] Dijkstra, E.W., "The Structure of the TUB* Multiprogramming System," 
CACM II, 5 pp. 341-346 (May 1968). 



- 180 - 

[Farber72] Farber, D.J„ Larson, K^ The Strueture of a Distributed Computer System - 
Commtititcattons", Proceedings of the Symposium on 
C om p ut er -C ommunfcattont Networks and Tetetraffk, Microwave Research 
Institute of Polytechnic Institute of Brooklyn, tiftt 



[Gray753 Gray, J.N., Uric, %K, PuttohwOft,, ami Trait^. I J^ "GfaftUkrtty of Locks 

and Degrees* Consbttncj in a Shamd Dale Base", IBM Research Report 
R J I6H September, tt?5 



f < .c, v- 



{GrayTfl Gray, J.N., "Notes on Data Base Operating Systems," Operating Systems: An 

Adm*e*4 Cmm in Vohttie <■$#» of Utturt Nm* -m Conpmn Stltnc*, 
Sprtnger-Ver% tf». pp.3»-4#l 

(HatsteadTSJ HaJstead, R^H., Multiple Processor Impkroentatsww of Message-Passing 
Systems. S.M. Thesis, M.i.T. Pepattnum af KtoaHkal Engineering and 
Computer Science. January, 1978. 



[Hammer^] Hammer, M. Rettatottty Mechanisms m SD£H, ta& at MIT-LCS. Soon to 
be written up. . 



CHewitt76] Hewitt, C, "Viewing Control Structures as Patterns of Passing Messages," 
M.I.T. Artificial Inwl^inc s Ubonnary^Aii Mom* «4ia B**rob«r, W«t 

[Hewitt77] Hewitt, C. and Baker, H., "Laws for Coromunkatting Parallel Processes," Proc 



[Hoare74] Hoare, C.A.R., "Monitors: an operating system structuring concept," CACM 17, 
5 (October »74X pp. M*«7. 



Qohnson75] Johnson, P.R. and RH. Thomas, "The Maintenance of Duplicate Databases," 
ARPANET mmmc cf>% JjmuwyH* 



[Lampson76] Lampson, B. and Sturgis, H., "Crash Recovery m a Distributed Data Storage 
System," Xerox Palo Alto Research Center, Ca. No v ember, WW. To appear 
in CACM. 



181 - 



[Liskov77] Liskov, B.H., et al., "Abstraction Mechanisms in CLU," CACM 20, 8 (August 
1977), pp. 564-576. 



[Metcalfe76] Metcalfe, R.M., et al, "Ethernet: Distributed Packet Switching for Local 
Computer Networks," CACM 19, No. 7, pp. 395-404. July, 1976. 



[Randell78] Randell, B., Lee, P.A., ai$ Tre^eayen, PJC^ "Reliability Igsujes fa Computing 
System Design,* ACM Computing Surveys 10, 2 {juM 1978), pp J23-166. 

[ReedTS] Reed, D.P^ "Protocols for the LCS Network," LCS-LNN «*V November, S76. 



[Reed78] Reed, D.P., "Naming and Synchroniiation la,, a Decentralized Computer 

System^ MIT. Technical Report 205, September, 1978. 



tRothnie77] Rothnie, J.B. Bernstein, PA, Ckwdman, R, and PapadimttrioB, C A^ The 
Redundant Update Methodology of SDD4: A System for Distributed 
Databates," GSmpotier^erpo^^ Report, February, 

1977. 



[Saltier78] Saltzer.jH, "Research Problems of Decentralized Systems with Largely 
Autpnomoifsl^esV!^ 



[Stearns76] Stearns, R„ et al, "Concurrency control for database systems," IEEE 
Symposium of Foundations of Computer Science CH 113 3-8 €, October, 1976, 
pp. 19-32. 



CThomas76] Thomas, R. H, "A Solution to the Update Problem for Multiple Copy Data 
Bases Which Uses Distributed Control," BBN Report •3340, July. 1976. 



-182* 
- AppoaeUicA 
Proofs of tho Protooolo 



This appendix gives a more formal definition of tome df th« concepu in the body of 
this thesis and proof! of some of the result*. For simplicity of description, the definitions and 
proofs in this appendix are tor the version of the mgiiaae Ito^sititr protocol In which each 
broadcast message is sent immediately to a process which is a common ancestor of all of the 
receivers and of the sender. In the actual implementation, of the protocol described in 
Chapter 3, each message may travel up the hierarchy in several hops. This difference does 
not effect the mult* proven here, *e long at only the menage receptions which take place in 
the distribution of * broadcast after that broadcast has teethed the common ancestor are 
used in determining the < ordering. This condition is consistent with the use made of the 
protocol by the concurrency control mechanism presented in Chapter 4. in which the process 
steps that take place in the distribution of a message after « has reached the common 
ancestor are the only ones which have effects that could be observed by process steps related 
to other messages. , 

A.l Formalization of Atomic Broadcasting 

Definition; For each process p, there is an ordering <^ on messages sent to p such 

that mj <^ mj iff m ( was received at p fc^r* m 2 was received at p. 
Each message sent to p is included in -y when it is received. 

PffthlMon: A broadcast B - {[b 1( p t l\ b,ki message which U to be sent to 

process p t as a part of B} 

Ptfinition: For each message m, let B<m) be the broadcast message from which m 

was derived. The set {m|B(m> - B} for some particular broadcast B 



183 



then contains both the component -messages of B and any other 
messages that are received in distributing those components. 

Definition: For broadcast messages B| and Bg, thereis' an ordering <, which is 

defined as Bj < B 2 if 3/», bj, b 2 such that B<b|) - Bj, and B(b 2 ) - B^ 
and bj <t. bo< 

Definition: Broadcasting is atomic iff < is cycle free. 

Definition: Let ~ be the Synchronization network relationship, which is a 

relationship among fain ©fsftrapsset |Mi**nii^ 
constraint. The graph defined by *♦ must have no directed or 
undirected cycles. Thus, there cUw note^tta set^o^thme or more 
processes pj, „.p n , all distinct, such that either p t *• p^j or p^j *• p t 
for all i<n, and £ ; and /> n are related by *. 

With these definitions, we can now define the message forwarder protocol by defining the 
process step spectffcattons of the forwarders. 

Definition: The process step specification of a message forwarder f is defined by 

a function F(B): 

F(B) - {[(XOM.0 W - p) A The set X^) is non amply} 

where B is the message, received in a process step, F(B) Is the set of 
pairs, each of which lists one of the output messages produced by that 
step and its destination process, and >ft&4>) i* ; a set describing the 
contents of one of the output messages of the process step, which is 
constructed as follows: w 

X(B^) - {&>,«] | Ob,?] € B A p ~* ?} 

Definition: Communication between message forwarders obeys the constraint of 

Se quencing , which can be stated as follows. If b| <r b 2 for messages 

b| and b 2 and message forwarder/, and if Cb'j.A? *> ?(!^ ?"** ^'2* P^ 
€ F(b 2 ), where F is the protocol specification for / then b'| <p B 2 
after both b'j and b' 2 ha v* been received by }fr. 



-184- 
A.2 Proof of Atomic Br oa dca s ting 

To show that the protocol defined above distributes the coniponenu of a broadcast 
message atomically, we must show that the < ordering tn any system state reachable by an 
execution of steps specified by the protocol is cycle free. To do so, I will show that for any 
step permitted by the protocol, if the < ordering is cycle free before that step, then it will be 
so after also. Because the starting state (before 'any message* live been received) has an 
empty < or d eri ng , which ta trivially cycle free, by induction in any state reached by following 
the protocol the < ordering will be cycle free. 

Before proceeding with the proof, I would like to make .«•* observ ation about the 
protocol that wilt be useful in the proof. U two processes p and q have each received a 
message derived from the same broadcast B, then there is a path in the graph defined by 
the'* relationship connecting p and f Each process on that path has also received a 
message derived from B. This is true because the graph has ho cycles, thus there is only one 
path between p and f, and the protocol allows each broadcast io enter the network at one 
point, from which it must reach all recipients. It is not possible for two processes to have 
seen messages from a broadcast B unlets all of the proossses en the path between those two 
processes have also seen messages from B. 

In each step of the protocol, some message m is received at some process p, possibly 
adding ordering relationships of the foonb(m>< b(n^ for messages m* previously received 
at p. We must show that introducing die relationship bdn 1 ) < b(m) for any message m' 
previously received at p can not introduce a cycle. The proof wttl be divided into 3 cases, 
depending on the origin of m and m\ 

CASE 1: m was not sent from a process P such that P *♦ p. In this case, m is the initial 



^185- 
entrance of broadcast message b(m) into the network of message forwarders. Therefore, 
before the reception of m, there were no ordering relationship* m < involving b(m), so that 
the reception of m could not introduce a cycle in < 

CASE 2: m and m' were both sent by some process P such that P *♦ p. In this case, the 
process P must have received a message M sudi that Bf>() - B^m), and a message M' such 
that B<M*) - Bto 1 ) in the process steps which produced -m and th\ Because of the 
sequencing of messages between P and p, the messages m and m* must ha ve been sent by P 
in the same order that they*were received at p. Thus the ordering relationship b^m*) < bXm) 
held before the reception of m (because of the receptions of If aid if * at F) and therefore, 
by the assumption that no eyete existed before the receptioo^^, ho cycle is created. 

CASE 3: m was sent by some process P for which P ~ p. but m' was not sent by P. This is 
the most difficult case. To show that no cycle is introduced by the reception of m in this 
case, I will assume that such a cycle is created and show that this assumption leads to a 
contradiction or a violation of the conditions of the protocol. 

Assume that the reception of m creates a cycle in the < ordering. Then prior to the 
reception of m, it must be the case that there is a sequence of broadcast messages <Bj, ~. 
B n >, such that Bj < B i+1 for 1 1 i < n., and Bj - B(m), and B n - B(m*). Consider now the 
set of processes pj, ■■■, p n .j at which these broadcasts were ordered. Now by the observation 
noted above, there exists a path in the network from each of these processes to the next 
process in the chain. Also, there exists a path between pf' and P, as both have received 
messages derived from b(m), and there exists » path between f n . 7 and p,m both hive 
received messages derived from b(m'). Thus became of this chain of broadcasts, there must 
exist a path between P and p. If the path implied by< the chvm of broadcasts does not go 
through the direct link between P and p, then we have discovered a cycle in the 



-186- 
synchronization network, violating the cottdtiioM of $J» protocol. | will show that tf *hat 
path doesgo thsough the digest Jtok between the " two n i oc ei m then cither *hc To a unontag of 
message* between F «nd # has been vtofatted, qr » qwk W0»< < ^ <h< < rohd o mhip before 
the reception of re at |b. 

If the ,path between #> and £ irnphed -by the jeqtMNMi «f bea*4cMtt includes *he 
direct link, then *onw broadcast, «kH ft B*. nmu h*^ been seen by both J\ end £, and 
furthermore, we knew that./* must have weetyed a meuagedcrived &em Bj^and M & result 
eent * menage to *. The broadcam B; and Bta) must Jw^ been onftei^ty 

receptions at #,^and Hfenji « ^,ia*#ihefi»ise':'^^ *ha ^clia** of 

bit»dca«ts.; : ;NeW::byy S^ 

message derived from Bj, which is impossible, as we know that a message derived from B>j 

must have been received at p. This contradiction demonstrates that it : is impossible for a 

' ■■■ . - ., ,i< . .-<■ r . ■ , 

cycle to arise from the reception -of mat ^ if the on^ pam ostween P and /> to the direct link. 

Thus another distinct path most exist in the s ync hr o ni sation network b e tween P and p, 

forming a cycle with the direct link. 

This completes the proof of the third and hut case, thus die synchronization protocol 
of Chapter 3 correctly coordinates atomic broadcasts. 

. ... .•■.■,'., , ■■■■*■ ■ ■■ .■ ,-■?*■ •}■ t . - "'■■; . '"•" " ' -. 

A J Correct Relative Sequencing of Broadcasts 

Art this section, I demonstrate that the protocol described in Chapter 3 for atomic 
broadcasting correctly orders atoraic broadcasis «Mh that a praeeu -newer receives some 
rnes»ge m before receiv^ son* messse^tha* ^^ Xhe proof wiU be for the 

simplified case hi **hish ehe*-» retati»nshipisasta»pieW*Tarchy. 



-187- 
Recall that the "should follow" relationship among messages was defined as: Each 
message m sent by a process p in a process step s sjjfiiM faHew a message m' whenever: 

•\ ' '. "'•• ", - - 

a) There is a message m" received by p in process step s or in a step 
that preceded s. and m' and m" are compone n t s of the same 
broadcast. 

OR 

b) There is a message m" received by p in step s or in a step that 
preceded s, and m" should follow m*. 

A key factor in this definition is that if m should follow m', then some process must have 
received a message derived from (Km*)- Using the message forwarder protocol of Chapter 3, 
if any process has received a message derived from a broadcast message B, then for any 
process p,it p will eventually receive a message derived from B, then thai message must toe 
represented in seme message, awaiting reception ajt £ or at one of the ancestors of £. This is 
true because each broadcast message enter* the hierarchy on<^ai»d all componenu flow 
downward in the hierarchy from the point of entry. No co m p o ne n t can tie received before 
the message is entered in the hierarchy, and once a manage is entered, each component is 
either above or at itt ultimate destination. 

I will now prove the claim that for any message m» there «an;be no message m' such 
that m should follow m', and the mnsage cefltammg the component containing m' is above 
that for m in the hierarchy. This, combined with the observation about the message 
forwarder protocol described in the previous paragraph, is sufficient to prove that when a 
message m is received at a process p, no message m* that should follow m will subsequently 
be received at p. 



-188- 
The proof of this chim wMI be by induction. IbW»»t, the claim to true, as there are 
no messages. We mutt shew that iff any state for whfch the clt^ is true, the reception of a 
message m at a process # as specified by the protocol cannot cause the claim to become false. 
There are two cast* one where m wat tent by the parent of £ and one where m was sent by 
some other process. 

CASE 1: m was sent by seme process P such that P ** p. When m was sent, all of the 
messages that m should follow must have been in the hierarchy (or already received) and not 
above P in the hierarchy. Therefore, because of the sequencing of messages between. p and 
P, messages that m should folkw will not be above p in the hierarchy when m is received at 

CASE 2: m was not sent by the parent of p. In this case, we must consider the messages that 
m should fbnow. These are at components of each broadcast message B for which £ the 
sender of m* had recerved a co mp onent prior to the lending of 3 W£°¥i& claim was true when 
re was sent, so no message met should feHow any of these broadtasa could have been above 
s at the time that m wat sent Therefore, because p most bean ancestor of *, there are no 
messages that should follow m that are above p when m is received «t p. 

This complete* the proof of the claim, and thtit the* proof that broadcasts are 
sequenced correctly by the message forwarder protocol usMg « hierarchical synchronization 
network. 

White the proof of correct sequencing of messages according to the "should follow" 
relationship is somewhat involved, the principal of operation of the protocol is simple. Each 
message pushes the messages that it should follow along paths in the hierarchy as it goes. 
The protocol works because there is only one path between any two processes in the 



- 189 - 
hierarchy, so that no message can sneak ahead of its place in the sequence of messages going 
to some destination process. 



-80- 



An AnalytJls «f thsj Propogatioa of Wlyva ltt— 



A major area of concern regarding the polyvaloe tchenw presented in Chapter 5 of 
this thesis is that failures may cause the number of items having pol y va l u c s to become targe 
This would waste storage space and cause a great deal of extra computation by the 
potytransactions acting on the data base. This appendix presents a simple model of the 
dynamic behavior of a distributed information system using the potyvawe scheme. An 
analysis is given to show that with reasonable parameters for die expected transactions and 
failure rates, the number of potyvatues in the data base remains quite smaM. A simulation of 
the system agrees well with these predicted results. 

B.1 A Model for the Creation and Deletion of Petyvaloes 

At any point in the execution of a distributed information system, we can calculate 
the expected rates of creation and deletion of potyvalues, based en seme assumptions about 
the expected transactions, and the failure characteristics of the system. These rates can be 
expressed as: 

Creation Rate - Propagation Rate ♦ New Failure Rate 
Deletion Rate - Recovery rate ♦ Propagation Overwrite Rate 

Propagation rate is the rate at which potytransactiens install potyvaJuet for their results in 
items which previously held simple values. New failure rate is the rate at which updates in 
progress are suspended, causing poh/vakses to be installed. Recovery rate is the rate at 



*491 - 
which failures which caused poly values to be produced are re cove re d . Finally, propagation 
overwrite rate is the (probably very low) rate at which an item with a poh/vakie is updated 
by a transaction producing a simple value. Tbi* occurs only if a transaction produces an 
output that is independent of the previous value of thfe^tpdated item. 

With some additional terminology, we can develop more precise expressions for the 
creation and deletion rates. I will use the fqllowingterminc4efyt9« describe the data base, 
the transactions, and the failure characteristics of the system: 

U - Update frequency (Updates/Second). This Is the*rate at which 
updates to the data base (not transactions) are made. U can be 
calculated from the overall tramattlfln We, the pi£&nt*ge of 
transactions which make updates, and the average number of 
updates per transaction. ,...,-,■ 

. W - The probability of an update being delayed by JaHuje. W can 
be computed from the mean time between failures, the time window 
in which an update can be delayed by a failure, and the update 

rate. ,;vy ■-■"■'*'..• ■ ;\\,^ i ' ■ ■■ u - :;r 

I - The number of items in the data base. 

R - The recovery rate for failures. This is the reciprocal of the 
mean time to recover failures (in secomls). ¥he description of 

failure recovery in this way assume* that the mean time V r« cover 
failures is exponentially distributed with mean of l/r. 

Y - Update independence. This parameter is the probability that 
the new value of an updated item wlttnot depend on its exact 
previous value. A value of for Y indicates that the new value of 
an updated item always depends on its previous value. 

D - Dependence of updated items on other data items. This 
parameter specifies, on the average the number of date items in the 
data base on which each update depends. 



-192- 
With these parameters, we can approximate the rales described above. In the 
expressions given betow for the rates, P represents the number of potyvatae* fin lh# data 
base. This is a first order approximation in which the praportton of data items in the data 
base having pojyvalues is assumed lb be small, thus terms involving (Pfl) 2 have been 
dropped. 

Propagation rate - U ♦ D ♦ P / 1 

New Failure Rate -IWW 

Recovery Rate • P • R 

Propagation Overwrite rate - U ♦ P * Y / 1 

These terms can be combined to give the expected rate of change of the number of 
poly values in the data base, yteMtog: 

^- - U*W ♦ U*D*P/I . U*P*Y/I - P*R 

This is a simple linear differential equation for P which indicates that the number of 
potyvatues would follow an exponential decay from its initial value to the steady state value, 
given that the parameters accurately describe the behavior of the system. The steady state 
expected number of potyvaks can be obtained by setting die rate of change equal to zero 
and solving for P. From this we obtain: 

p U»W*I 

p " I*fUU*V-U*b 

Several non-mtuittve propertfw of this solution should be explained. First, it would 
seem that the denominator of this expression can go to arc, causing the steady state expected 
number of potyvatues to be infinite or negative. This situation arises when the propagation 
rate is equaf to or greater than the rate at which potatoes are removed through failure 



-193- 
recovery or overwritten. If this were the case, we would indeed expect the number of 
polyvalues in the data base to become large. In fact, the number of polyvalues in the data 
base would grow so large that this simple first order analysis would no longer be correct, and 
the number of polyvalues would be limited by second order effects which I have ignored. (I 
have, for example, ignored the possibility that an item involved in a failed update or the 
target of propagation already has a poly value, and thus does not represent a new poly value.) 

A second feature of the equation which may seem Strange is that it depends irt a 
non-trivial way on I, the number of items in the data base. This is because the creation and 
deletion of polyvalues directly due to failures is not dependent on the data base size, while 
the propagation terms depend on the ratio P/I. If I is very large compared to P, then the 
effect of propagation is small, as the chance that items with polyvalues will be used by 
transactions is small. If, however, the data base is small, then the chance that items with 
polyvalues will be involved in transactions is larger, and the propagation terms become more 
significant. 

Another point to notice about these equations U^hat they are stable, meaning that if 
the current number of polyvalues is larger than the expected number, |he expected change in 
the number of polyvalues is a decrease. This indicates that if some catastrophe introduces a 
large number of polyvalues into the data base; the number should soon decrease to the 
expected number, given that the values of W and R are not effaced by the catastrophe. 

Table B.l gives some typical values for P. Several observations can be made about 
this data. Decreasing R causes an increase in me number of polyvalues, as would be 
expected. Increasing W causes a proportional increase hi the number of polyvalues. 
Decreasing I causes the number of polyvalues to rise. The parameters Y and D have little 
effect, unless the values of the parameters are such that the denominator of the equation for 



194 



P is near veto. 



Notice that even for reasonably pestimtttk failure rates and recovery times, the 
number of potyvalues remains quite small These reanks indicate that the polyvahie scheme 
is feasible in a distributed information system, and that the chances of a combinatorial 
explosion of the amount of computation are very amaJL The next section of this appendix 
contains a brief description of a simufetton of the we of pefyvaJues, which shows that the 
pr«dictk)n«ofthUmoeWarereascr«Wy»ccufate. 



TnbUB.l 
Typical Prodiotiens of the Nunabor of PoljrvaluM la 

Parameters 



U 



w 



R 



1 


0.000! 


IjOOO/JOO 


0.001 





1 


1 


(mm 


•^PV^/^I^^I^J 


OjOOI 





wo 


10 


0.0001 


WJOOjOOO 


0.001 





1 


100 


©joooi 


•^^^Wn*w*y 


ftOOt 


« 


1 


10 


0.0001 


lOOjOOO 


0.001 





1 


10 


0.0001 


100,000 


aoa 





5 


10 


OiOOOt 


100,000 


0.001 





7 


10 


©jOOOJ 


100,000 


aoa 


1 


1 


10 


0.0001 


20J000 


0.001 





1 


10 


OjOOOJ 


11*000 


0.001 





1 


10 


0.001 


ljooojaoo 


0.001 





1 


10 


0.005 


I^OOjOOO 


gom 





1 


10 


O000I 


M)00j000 


0.0001 





I 



• Database 

prediction 

P 

0.10 
011 
1.01 

11.11 
1.11 

2.00 
3.33 
100 
ZOO 
MjOO 
K)J0 
6050 
HJ00 



-195- 
B.2 Simulation of the Use of Potrvaoes 

In order to verify that the approximations made in analyzing the above mode} do 
not lead to an inaccurate description of the behavior of the polyvalue system, I constructed a 
simulation of the manipulation of polyvalues in a distributed information system which is 
based on the above model, but not the approximations made in the analysis. 

The simulation assigns unique identifiers to each failure creating a polyvalue, in 
order to distinguish them. For each item in the data base, the simulation maintains a vector 
containing the identifiers of the pending transactions on which that item depends, refered to 
as the state of the item. An item has a polyvalue if its state is non-empty (i.e. if that item 
depends on a pending transaction). 

. Updates are simulated at the rate U. Each such update selects a random integer d 
with mean D, and d random items from the data base. Some random item is selected as the 
target of the update. The state of the updated item is replaced with a merge of the states of 
the selected d items. With probability (1-Y), the previous state of the updated item is also 
merged into its new state. 

With a probability W, the update is chosen to fall. A failure is simulated by 
selecting a new idenfiffcr, adding tt to the *tat«9f the upd«*i Hero* and selecting a recovery 
time for the failure- Recovery times are exponentially distributed, with mean 1/R. When the 
recovery time for a failure is reached, the identifier of tha( failure is removed from all item 

states. 

The limits of the simulating program prevent the sitnutetion of *ery large data bases, 
or very high update rates. However, for the parameters Jha* can easily be simulated, the 
simulation agrees well with the predictions of the model. Table B.2 contains the results of 



-196- 
the simulation for some sample parameter settings. The numberi of potyvalues obtained 
through the simulation were in general somewhat mailer than those predicted by the 
analysis. This difference U primarily due to the fact that the rate at which potyvatues arc 
created is smaller than that predicted by the model, because die target of a (ailed update or 
the target of a propagation may already have a potyvatue. 

In conclusion, these results show that the potyvalue scheme it feasible for preventing 
delay due to locking, provided that reasonable measures are taken to minimize .the number 
of failures that introduce potyvatues. 



TnbloBJB 
Basra!** #f th« Slmaievtlost «f PotjrvsUuwsi 

parameters ' predi 

U W I R YD 



2 


0.01 


10,000 


0.01 





1 


5 


0.01 


tojooo 


0.01 





1 


W 


O.0I 


mm 


o.ot 


o 


I 


10 


0.001 


10,000 


0.01 





1 


10 


0.01 


io*»o 


0.01 





5 


10 


0.01 


WflOO 


oot 


1 


5 



ieted 


actual 


P 


P 


2.04 


2.00 


5.26 


2.71 


nil 


95 


1.11 


0.74 


20 


»J 


16.7 


&8 



-197- 
Biographioal Note 



Warren Montgomery was born on March 29, 1951 in Highland Park, Illinois. He 
grew up in Deerfleld Illinois where he graduated from Deerfield high School in 1969. He 
received the Rensellaer Science Medal and placed 35th nationally in the M.A.A. high school 
mathematics competition. 

From 1969 to 1973, Warren attended Dartmouth College in Hanover New Hampshire. 
At Dartmouth, he worked part time on the operating system for the Dartmouth 
Time-Sharing System. He received the John G. Kemmeny prize in computing for his 
contributions to this system. He graduated summa cum laude, with a double major in 
Mathematics and in Engineering. His thesis was entitled "Non-Adiabatic Behavior of 
Charged Particles in a Magnetic Null Sheet". 

From 1973 through 1978 Mr. Montgomery has been a graduate student at the 
Massachusetts Institute of Technology. He was supported for the first three years as a 
National Science Foundation Fellow. In 1976, he was awarded simultaneous degrees of 
Master of Science and Electrical Engineer. His S.M. and E.E. thesis was entitled "A Secure 
and Flexible Model of Process Initiation for a Computer Utility". 

From 1976 to September of 1979 Mr. Montgomery was supported as a research 
assistant in the Computer Systems Research Croup of the M .IT. Laboratory for Computer 
Science. In the summer of 1977, he also worked for Prime Computer Company, designing a 
communication protocol for a high-speed inter-processor communication network. 

Mr. Montgomery is a member of the Association for Computing Machinery, and its 
special interest groups on Operating Systems and Programming Languages. He is also a 
member of the Sigma Xi scientific honorary society. 

Mr Montgomery is currently working as a member of the technical staff of Bell 
Telephone Laboratories, investigating the design of software to support new home 
communication services. He is married to the former Carta P. Westlund. 



